Drop the debug level for logging fd creation to DEBUG_DEBUG
[sahlberg/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1231                                          struct ctdb_node_map *nodemap,
1232                                          uint32_t *culprit)
1233 {
1234         int j;
1235         int ret;
1236
1237         if (ctdb->num_nodes != nodemap->num) {
1238                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1239                                   ctdb->num_nodes, nodemap->num));
1240                 if (culprit) {
1241                         *culprit = ctdb->pnn;
1242                 }
1243                 return -1;
1244         }
1245
1246         for (j=0; j<nodemap->num; j++) {
1247                 /* release any existing data */
1248                 if (ctdb->nodes[j]->known_public_ips) {
1249                         talloc_free(ctdb->nodes[j]->known_public_ips);
1250                         ctdb->nodes[j]->known_public_ips = NULL;
1251                 }
1252                 if (ctdb->nodes[j]->available_public_ips) {
1253                         talloc_free(ctdb->nodes[j]->available_public_ips);
1254                         ctdb->nodes[j]->available_public_ips = NULL;
1255                 }
1256
1257                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1258                         continue;
1259                 }
1260
1261                 /* grab a new shiny list of public ips from the node */
1262                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1263                                         CONTROL_TIMEOUT(),
1264                                         ctdb->nodes[j]->pnn,
1265                                         ctdb->nodes,
1266                                         0,
1267                                         &ctdb->nodes[j]->known_public_ips);
1268                 if (ret != 0) {
1269                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1270                                 ctdb->nodes[j]->pnn));
1271                         if (culprit) {
1272                                 *culprit = ctdb->nodes[j]->pnn;
1273                         }
1274                         return -1;
1275                 }
1276
1277                 /* grab a new shiny list of public ips from the node */
1278                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1279                                         CONTROL_TIMEOUT(),
1280                                         ctdb->nodes[j]->pnn,
1281                                         ctdb->nodes,
1282                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1283                                         &ctdb->nodes[j]->available_public_ips);
1284                 if (ret != 0) {
1285                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1286                                 ctdb->nodes[j]->pnn));
1287                         if (culprit) {
1288                                 *culprit = ctdb->nodes[j]->pnn;
1289                         }
1290                         return -1;
1291                 }
1292         }
1293
1294         return 0;
1295 }
1296
1297 /*
1298   we are the recmaster, and recovery is needed - start a recovery run
1299  */
1300 static int do_recovery(struct ctdb_recoverd *rec, 
1301                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1302                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1303 {
1304         struct ctdb_context *ctdb = rec->ctdb;
1305         int i, j, ret;
1306         uint32_t generation;
1307         struct ctdb_dbid_map *dbmap;
1308         TDB_DATA data;
1309         uint32_t *nodes;
1310         struct timeval start_time;
1311         uint32_t culprit = (uint32_t)-1;
1312
1313         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1314
1315         /* if recovery fails, force it again */
1316         rec->need_recovery = true;
1317
1318         for (i=0; i<ctdb->num_nodes; i++) {
1319                 struct ctdb_banning_state *ban_state;
1320
1321                 if (ctdb->nodes[i]->ban_state == NULL) {
1322                         continue;
1323                 }
1324                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1325                 if (ban_state->count < 2*ctdb->num_nodes) {
1326                         continue;
1327                 }
1328                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1329                         ctdb->nodes[i]->pnn, ban_state->count,
1330                         ctdb->tunable.recovery_ban_period));
1331                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1332                 ban_state->count = 0;
1333         }
1334
1335
1336         if (ctdb->tunable.verify_recovery_lock != 0) {
1337                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1338                 start_time = timeval_current();
1339                 if (!ctdb_recovery_lock(ctdb, true)) {
1340                         ctdb_set_culprit(rec, pnn);
1341                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1342                         return -1;
1343                 }
1344                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1345                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1346         }
1347
1348         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1349
1350         /* get a list of all databases */
1351         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1352         if (ret != 0) {
1353                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1354                 return -1;
1355         }
1356
1357         /* we do the db creation before we set the recovery mode, so the freeze happens
1358            on all databases we will be dealing with. */
1359
1360         /* verify that we have all the databases any other node has */
1361         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1362         if (ret != 0) {
1363                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1364                 return -1;
1365         }
1366
1367         /* verify that all other nodes have all our databases */
1368         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1369         if (ret != 0) {
1370                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1371                 return -1;
1372         }
1373         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1374
1375         /* update the database priority for all remote databases */
1376         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1377         if (ret != 0) {
1378                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1379         }
1380         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1381
1382
1383         /* set recovery mode to active on all nodes */
1384         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1385         if (ret != 0) {
1386                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1387                 return -1;
1388         }
1389
1390         /* execute the "startrecovery" event script on all nodes */
1391         ret = run_startrecovery_eventscript(rec, nodemap);
1392         if (ret!=0) {
1393                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1394                 return -1;
1395         }
1396
1397         /*
1398           update all nodes to have the same flags that we have
1399          */
1400         for (i=0;i<nodemap->num;i++) {
1401                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1402                         continue;
1403                 }
1404
1405                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1406                 if (ret != 0) {
1407                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1408                         return -1;
1409                 }
1410         }
1411
1412         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1413
1414         /* pick a new generation number */
1415         generation = new_generation();
1416
1417         /* change the vnnmap on this node to use the new generation 
1418            number but not on any other nodes.
1419            this guarantees that if we abort the recovery prematurely
1420            for some reason (a node stops responding?)
1421            that we can just return immediately and we will reenter
1422            recovery shortly again.
1423            I.e. we deliberately leave the cluster with an inconsistent
1424            generation id to allow us to abort recovery at any stage and
1425            just restart it from scratch.
1426          */
1427         vnnmap->generation = generation;
1428         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1429         if (ret != 0) {
1430                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1431                 return -1;
1432         }
1433
1434         data.dptr = (void *)&generation;
1435         data.dsize = sizeof(uint32_t);
1436
1437         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1438         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1439                                         nodes, 0,
1440                                         CONTROL_TIMEOUT(), false, data,
1441                                         NULL,
1442                                         transaction_start_fail_callback,
1443                                         rec) != 0) {
1444                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1445                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1446                                         nodes, 0,
1447                                         CONTROL_TIMEOUT(), false, tdb_null,
1448                                         NULL,
1449                                         NULL,
1450                                         NULL) != 0) {
1451                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1452                 }
1453                 return -1;
1454         }
1455
1456         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1457
1458         for (i=0;i<dbmap->num;i++) {
1459                 ret = recover_database(rec, mem_ctx,
1460                                        dbmap->dbs[i].dbid,
1461                                        dbmap->dbs[i].persistent,
1462                                        pnn, nodemap, generation);
1463                 if (ret != 0) {
1464                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1465                         return -1;
1466                 }
1467         }
1468
1469         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1470
1471         /* commit all the changes */
1472         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1473                                         nodes, 0,
1474                                         CONTROL_TIMEOUT(), false, data,
1475                                         NULL, NULL,
1476                                         NULL) != 0) {
1477                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1478                 return -1;
1479         }
1480
1481         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1482         
1483
1484         /* update the capabilities for all nodes */
1485         ret = update_capabilities(ctdb, nodemap);
1486         if (ret!=0) {
1487                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1488                 return -1;
1489         }
1490
1491         /* build a new vnn map with all the currently active and
1492            unbanned nodes */
1493         generation = new_generation();
1494         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1495         CTDB_NO_MEMORY(ctdb, vnnmap);
1496         vnnmap->generation = generation;
1497         vnnmap->size = 0;
1498         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1499         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1500         for (i=j=0;i<nodemap->num;i++) {
1501                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1502                         continue;
1503                 }
1504                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1505                         /* this node can not be an lmaster */
1506                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1507                         continue;
1508                 }
1509
1510                 vnnmap->size++;
1511                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1512                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1513                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1514
1515         }
1516         if (vnnmap->size == 0) {
1517                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1518                 vnnmap->size++;
1519                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1520                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1521                 vnnmap->map[0] = pnn;
1522         }       
1523
1524         /* update to the new vnnmap on all nodes */
1525         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1526         if (ret != 0) {
1527                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1528                 return -1;
1529         }
1530
1531         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1532
1533         /* update recmaster to point to us for all nodes */
1534         ret = set_recovery_master(ctdb, nodemap, pnn);
1535         if (ret!=0) {
1536                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1537                 return -1;
1538         }
1539
1540         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1541
1542         /*
1543           update all nodes to have the same flags that we have
1544          */
1545         for (i=0;i<nodemap->num;i++) {
1546                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1547                         continue;
1548                 }
1549
1550                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1551                 if (ret != 0) {
1552                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1553                         return -1;
1554                 }
1555         }
1556
1557         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1558
1559         /* disable recovery mode */
1560         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1561         if (ret != 0) {
1562                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1563                 return -1;
1564         }
1565
1566         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1567
1568         /*
1569           tell nodes to takeover their public IPs
1570          */
1571         ret = ctdb_reload_remote_public_ips(ctdb, nodemap, &culprit);
1572         if (ret != 0) {
1573                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1574                                  culprit));
1575                 return -1;
1576         }
1577         rec->need_takeover_run = false;
1578         ret = ctdb_takeover_run(ctdb, nodemap);
1579         if (ret != 0) {
1580                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1581                 return -1;
1582         }
1583         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1584
1585         /* execute the "recovered" event script on all nodes */
1586         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1587         if (ret!=0) {
1588                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1589                 return -1;
1590         }
1591
1592         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1593
1594         /* send a message to all clients telling them that the cluster 
1595            has been reconfigured */
1596         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1597
1598         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1599
1600         rec->need_recovery = false;
1601
1602         /* we managed to complete a full recovery, make sure to forgive
1603            any past sins by the nodes that could now participate in the
1604            recovery.
1605         */
1606         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1607         for (i=0;i<nodemap->num;i++) {
1608                 struct ctdb_banning_state *ban_state;
1609
1610                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1611                         continue;
1612                 }
1613
1614                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1615                 if (ban_state == NULL) {
1616                         continue;
1617                 }
1618
1619                 ban_state->count = 0;
1620         }
1621
1622
1623         /* We just finished a recovery successfully. 
1624            We now wait for rerecovery_timeout before we allow 
1625            another recovery to take place.
1626         */
1627         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1628         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1629         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1630
1631         return 0;
1632 }
1633
1634
1635 /*
1636   elections are won by first checking the number of connected nodes, then
1637   the priority time, then the pnn
1638  */
1639 struct election_message {
1640         uint32_t num_connected;
1641         struct timeval priority_time;
1642         uint32_t pnn;
1643         uint32_t node_flags;
1644 };
1645
1646 /*
1647   form this nodes election data
1648  */
1649 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1650 {
1651         int ret, i;
1652         struct ctdb_node_map *nodemap;
1653         struct ctdb_context *ctdb = rec->ctdb;
1654
1655         ZERO_STRUCTP(em);
1656
1657         em->pnn = rec->ctdb->pnn;
1658         em->priority_time = rec->priority_time;
1659
1660         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1661         if (ret != 0) {
1662                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1663                 return;
1664         }
1665
1666         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1667         em->node_flags = rec->node_flags;
1668
1669         for (i=0;i<nodemap->num;i++) {
1670                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1671                         em->num_connected++;
1672                 }
1673         }
1674
1675         /* we shouldnt try to win this election if we cant be a recmaster */
1676         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1677                 em->num_connected = 0;
1678                 em->priority_time = timeval_current();
1679         }
1680
1681         talloc_free(nodemap);
1682 }
1683
1684 /*
1685   see if the given election data wins
1686  */
1687 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1688 {
1689         struct election_message myem;
1690         int cmp = 0;
1691
1692         ctdb_election_data(rec, &myem);
1693
1694         /* we cant win if we dont have the recmaster capability */
1695         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1696                 return false;
1697         }
1698
1699         /* we cant win if we are banned */
1700         if (rec->node_flags & NODE_FLAGS_BANNED) {
1701                 return false;
1702         }       
1703
1704         /* we cant win if we are stopped */
1705         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1706                 return false;
1707         }       
1708
1709         /* we will automatically win if the other node is banned */
1710         if (em->node_flags & NODE_FLAGS_BANNED) {
1711                 return true;
1712         }
1713
1714         /* we will automatically win if the other node is banned */
1715         if (em->node_flags & NODE_FLAGS_STOPPED) {
1716                 return true;
1717         }
1718
1719         /* try to use the most connected node */
1720         if (cmp == 0) {
1721                 cmp = (int)myem.num_connected - (int)em->num_connected;
1722         }
1723
1724         /* then the longest running node */
1725         if (cmp == 0) {
1726                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1727         }
1728
1729         if (cmp == 0) {
1730                 cmp = (int)myem.pnn - (int)em->pnn;
1731         }
1732
1733         return cmp > 0;
1734 }
1735
1736 /*
1737   send out an election request
1738  */
1739 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1740 {
1741         int ret;
1742         TDB_DATA election_data;
1743         struct election_message emsg;
1744         uint64_t srvid;
1745         struct ctdb_context *ctdb = rec->ctdb;
1746
1747         srvid = CTDB_SRVID_RECOVERY;
1748
1749         ctdb_election_data(rec, &emsg);
1750
1751         election_data.dsize = sizeof(struct election_message);
1752         election_data.dptr  = (unsigned char *)&emsg;
1753
1754
1755         /* send an election message to all active nodes */
1756         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1757         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1758
1759
1760         /* A new node that is already frozen has entered the cluster.
1761            The existing nodes are not frozen and dont need to be frozen
1762            until the election has ended and we start the actual recovery
1763         */
1764         if (update_recmaster == true) {
1765                 /* first we assume we will win the election and set 
1766                    recoverymaster to be ourself on the current node
1767                  */
1768                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1769                 if (ret != 0) {
1770                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1771                         return -1;
1772                 }
1773         }
1774
1775
1776         return 0;
1777 }
1778
1779 /*
1780   this function will unban all nodes in the cluster
1781 */
1782 static void unban_all_nodes(struct ctdb_context *ctdb)
1783 {
1784         int ret, i;
1785         struct ctdb_node_map *nodemap;
1786         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1787         
1788         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1789         if (ret != 0) {
1790                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1791                 return;
1792         }
1793
1794         for (i=0;i<nodemap->num;i++) {
1795                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1796                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1797                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1798                 }
1799         }
1800
1801         talloc_free(tmp_ctx);
1802 }
1803
1804
1805 /*
1806   we think we are winning the election - send a broadcast election request
1807  */
1808 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1809 {
1810         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1811         int ret;
1812
1813         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1814         if (ret != 0) {
1815                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1816         }
1817
1818         talloc_free(rec->send_election_te);
1819         rec->send_election_te = NULL;
1820 }
1821
1822 /*
1823   handler for memory dumps
1824 */
1825 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1826                              TDB_DATA data, void *private_data)
1827 {
1828         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1829         TDB_DATA *dump;
1830         int ret;
1831         struct rd_memdump_reply *rd;
1832
1833         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1834                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1835                 talloc_free(tmp_ctx);
1836                 return;
1837         }
1838         rd = (struct rd_memdump_reply *)data.dptr;
1839
1840         dump = talloc_zero(tmp_ctx, TDB_DATA);
1841         if (dump == NULL) {
1842                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1843                 talloc_free(tmp_ctx);
1844                 return;
1845         }
1846         ret = ctdb_dump_memory(ctdb, dump);
1847         if (ret != 0) {
1848                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1849                 talloc_free(tmp_ctx);
1850                 return;
1851         }
1852
1853 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1854
1855         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1856         if (ret != 0) {
1857                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1858                 talloc_free(tmp_ctx);
1859                 return;
1860         }
1861
1862         talloc_free(tmp_ctx);
1863 }
1864
1865 /*
1866   handler for reload_nodes
1867 */
1868 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1869                              TDB_DATA data, void *private_data)
1870 {
1871         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1872
1873         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1874
1875         reload_nodes_file(rec->ctdb);
1876 }
1877
1878
1879 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1880                               struct timeval yt, void *p)
1881 {
1882         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1883
1884         talloc_free(rec->ip_check_disable_ctx);
1885         rec->ip_check_disable_ctx = NULL;
1886 }
1887
1888 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1889                              TDB_DATA data, void *private_data)
1890 {
1891         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1892         uint32_t timeout;
1893
1894         if (rec->ip_check_disable_ctx != NULL) {
1895                 talloc_free(rec->ip_check_disable_ctx);
1896                 rec->ip_check_disable_ctx = NULL;
1897         }
1898
1899         if (data.dsize != sizeof(uint32_t)) {
1900                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1901                                  "expexting %lu\n", (long unsigned)data.dsize,
1902                                  (long unsigned)sizeof(uint32_t)));
1903                 return;
1904         }
1905         if (data.dptr == NULL) {
1906                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1907                 return;
1908         }
1909
1910         timeout = *((uint32_t *)data.dptr);
1911         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1912
1913         rec->ip_check_disable_ctx = talloc_new(rec);
1914         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1915
1916         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1917 }
1918
1919
1920 /*
1921   handler for ip reallocate, just add it to the list of callers and 
1922   handle this later in the monitor_cluster loop so we do not recurse
1923   with other callers to takeover_run()
1924 */
1925 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1926                              TDB_DATA data, void *private_data)
1927 {
1928         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1929         struct ip_reallocate_list *caller;
1930
1931         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1932                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1933                 return;
1934         }
1935
1936         if (rec->ip_reallocate_ctx == NULL) {
1937                 rec->ip_reallocate_ctx = talloc_new(rec);
1938                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1939         }
1940
1941         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1942         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1943
1944         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1945         caller->next = rec->reallocate_callers;
1946         rec->reallocate_callers = caller;
1947
1948         return;
1949 }
1950
1951 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1952 {
1953         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1954         TDB_DATA result;
1955         int32_t ret;
1956         struct ip_reallocate_list *callers;
1957         uint32_t culprit;
1958
1959         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1960
1961         /* update the list of public ips that a node can handle for
1962            all connected nodes
1963         */
1964         ret = ctdb_reload_remote_public_ips(ctdb, rec->nodemap, &culprit);
1965         if (ret != 0) {
1966                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1967                                  culprit));
1968                 rec->need_takeover_run = true;
1969         }
1970         if (ret == 0) {
1971                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
1972                 if (ret != 0) {
1973                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1974                                          culprit));
1975                         rec->need_takeover_run = true;
1976                 }
1977         }
1978
1979         result.dsize = sizeof(int32_t);
1980         result.dptr  = (uint8_t *)&ret;
1981
1982         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1983
1984                 /* Someone that sent srvid==0 does not want a reply */
1985                 if (callers->rd->srvid == 0) {
1986                         continue;
1987                 }
1988                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1989                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1990                                   (unsigned long long)callers->rd->srvid));
1991                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1992                 if (ret != 0) {
1993                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1994                                          "message to %u:%llu\n",
1995                                          (unsigned)callers->rd->pnn,
1996                                          (unsigned long long)callers->rd->srvid));
1997                 }
1998         }
1999
2000         talloc_free(tmp_ctx);
2001         talloc_free(rec->ip_reallocate_ctx);
2002         rec->ip_reallocate_ctx = NULL;
2003         rec->reallocate_callers = NULL;
2004         
2005 }
2006
2007
2008 /*
2009   handler for recovery master elections
2010 */
2011 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2012                              TDB_DATA data, void *private_data)
2013 {
2014         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2015         int ret;
2016         struct election_message *em = (struct election_message *)data.dptr;
2017         TALLOC_CTX *mem_ctx;
2018
2019         /* we got an election packet - update the timeout for the election */
2020         talloc_free(rec->election_timeout);
2021         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2022                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2023                                                 ctdb_election_timeout, rec);
2024
2025         mem_ctx = talloc_new(ctdb);
2026
2027         /* someone called an election. check their election data
2028            and if we disagree and we would rather be the elected node, 
2029            send a new election message to all other nodes
2030          */
2031         if (ctdb_election_win(rec, em)) {
2032                 if (!rec->send_election_te) {
2033                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2034                                                                 timeval_current_ofs(0, 500000),
2035                                                                 election_send_request, rec);
2036                 }
2037                 talloc_free(mem_ctx);
2038                 /*unban_all_nodes(ctdb);*/
2039                 return;
2040         }
2041         
2042         /* we didn't win */
2043         talloc_free(rec->send_election_te);
2044         rec->send_election_te = NULL;
2045
2046         if (ctdb->tunable.verify_recovery_lock != 0) {
2047                 /* release the recmaster lock */
2048                 if (em->pnn != ctdb->pnn &&
2049                     ctdb->recovery_lock_fd != -1) {
2050                         close(ctdb->recovery_lock_fd);
2051                         ctdb->recovery_lock_fd = -1;
2052                         unban_all_nodes(ctdb);
2053                 }
2054         }
2055
2056         /* ok, let that guy become recmaster then */
2057         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2058         if (ret != 0) {
2059                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2060                 talloc_free(mem_ctx);
2061                 return;
2062         }
2063
2064         talloc_free(mem_ctx);
2065         return;
2066 }
2067
2068
2069 /*
2070   force the start of the election process
2071  */
2072 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2073                            struct ctdb_node_map *nodemap)
2074 {
2075         int ret;
2076         struct ctdb_context *ctdb = rec->ctdb;
2077
2078         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2079
2080         /* set all nodes to recovery mode to stop all internode traffic */
2081         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2084                 return;
2085         }
2086
2087         talloc_free(rec->election_timeout);
2088         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2089                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2090                                                 ctdb_election_timeout, rec);
2091
2092         ret = send_election_request(rec, pnn, true);
2093         if (ret!=0) {
2094                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2095                 return;
2096         }
2097
2098         /* wait for a few seconds to collect all responses */
2099         ctdb_wait_election(rec);
2100 }
2101
2102
2103
2104 /*
2105   handler for when a node changes its flags
2106 */
2107 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2108                             TDB_DATA data, void *private_data)
2109 {
2110         int ret;
2111         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2112         struct ctdb_node_map *nodemap=NULL;
2113         TALLOC_CTX *tmp_ctx;
2114         uint32_t changed_flags;
2115         int i;
2116         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2117         int disabled_flag_changed;
2118
2119         if (data.dsize != sizeof(*c)) {
2120                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2121                 return;
2122         }
2123
2124         tmp_ctx = talloc_new(ctdb);
2125         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2126
2127         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2128         if (ret != 0) {
2129                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2130                 talloc_free(tmp_ctx);
2131                 return;         
2132         }
2133
2134
2135         for (i=0;i<nodemap->num;i++) {
2136                 if (nodemap->nodes[i].pnn == c->pnn) break;
2137         }
2138
2139         if (i == nodemap->num) {
2140                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2141                 talloc_free(tmp_ctx);
2142                 return;
2143         }
2144
2145         changed_flags = c->old_flags ^ c->new_flags;
2146
2147         if (nodemap->nodes[i].flags != c->new_flags) {
2148                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2149         }
2150
2151         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2152
2153         nodemap->nodes[i].flags = c->new_flags;
2154
2155         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2156                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2157
2158         if (ret == 0) {
2159                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2160                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2161         }
2162         
2163         if (ret == 0 &&
2164             ctdb->recovery_master == ctdb->pnn &&
2165             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2166                 /* Only do the takeover run if the perm disabled or unhealthy
2167                    flags changed since these will cause an ip failover but not
2168                    a recovery.
2169                    If the node became disconnected or banned this will also
2170                    lead to an ip address failover but that is handled 
2171                    during recovery
2172                 */
2173                 if (disabled_flag_changed) {
2174                         rec->need_takeover_run = true;
2175                 }
2176         }
2177
2178         talloc_free(tmp_ctx);
2179 }
2180
2181 /*
2182   handler for when we need to push out flag changes ot all other nodes
2183 */
2184 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2185                             TDB_DATA data, void *private_data)
2186 {
2187         int ret;
2188         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2189         struct ctdb_node_map *nodemap=NULL;
2190         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2191         uint32_t recmaster;
2192         uint32_t *nodes;
2193
2194         /* find the recovery master */
2195         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2196         if (ret != 0) {
2197                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2198                 talloc_free(tmp_ctx);
2199                 return;
2200         }
2201
2202         /* read the node flags from the recmaster */
2203         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2204         if (ret != 0) {
2205                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2206                 talloc_free(tmp_ctx);
2207                 return;
2208         }
2209         if (c->pnn >= nodemap->num) {
2210                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2211                 talloc_free(tmp_ctx);
2212                 return;
2213         }
2214
2215         /* send the flags update to all connected nodes */
2216         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2217
2218         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2219                                       nodes, 0, CONTROL_TIMEOUT(),
2220                                       false, data,
2221                                       NULL, NULL,
2222                                       NULL) != 0) {
2223                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2224
2225                 talloc_free(tmp_ctx);
2226                 return;
2227         }
2228
2229         talloc_free(tmp_ctx);
2230 }
2231
2232
2233 struct verify_recmode_normal_data {
2234         uint32_t count;
2235         enum monitor_result status;
2236 };
2237
2238 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2239 {
2240         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2241
2242
2243         /* one more node has responded with recmode data*/
2244         rmdata->count--;
2245
2246         /* if we failed to get the recmode, then return an error and let
2247            the main loop try again.
2248         */
2249         if (state->state != CTDB_CONTROL_DONE) {
2250                 if (rmdata->status == MONITOR_OK) {
2251                         rmdata->status = MONITOR_FAILED;
2252                 }
2253                 return;
2254         }
2255
2256         /* if we got a response, then the recmode will be stored in the
2257            status field
2258         */
2259         if (state->status != CTDB_RECOVERY_NORMAL) {
2260                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2261                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2262         }
2263
2264         return;
2265 }
2266
2267
2268 /* verify that all nodes are in normal recovery mode */
2269 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2270 {
2271         struct verify_recmode_normal_data *rmdata;
2272         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2273         struct ctdb_client_control_state *state;
2274         enum monitor_result status;
2275         int j;
2276         
2277         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2278         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2279         rmdata->count  = 0;
2280         rmdata->status = MONITOR_OK;
2281
2282         /* loop over all active nodes and send an async getrecmode call to 
2283            them*/
2284         for (j=0; j<nodemap->num; j++) {
2285                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2286                         continue;
2287                 }
2288                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2289                                         CONTROL_TIMEOUT(), 
2290                                         nodemap->nodes[j].pnn);
2291                 if (state == NULL) {
2292                         /* we failed to send the control, treat this as 
2293                            an error and try again next iteration
2294                         */                      
2295                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2296                         talloc_free(mem_ctx);
2297                         return MONITOR_FAILED;
2298                 }
2299
2300                 /* set up the callback functions */
2301                 state->async.fn = verify_recmode_normal_callback;
2302                 state->async.private_data = rmdata;
2303
2304                 /* one more control to wait for to complete */
2305                 rmdata->count++;
2306         }
2307
2308
2309         /* now wait for up to the maximum number of seconds allowed
2310            or until all nodes we expect a response from has replied
2311         */
2312         while (rmdata->count > 0) {
2313                 event_loop_once(ctdb->ev);
2314         }
2315
2316         status = rmdata->status;
2317         talloc_free(mem_ctx);
2318         return status;
2319 }
2320
2321
2322 struct verify_recmaster_data {
2323         struct ctdb_recoverd *rec;
2324         uint32_t count;
2325         uint32_t pnn;
2326         enum monitor_result status;
2327 };
2328
2329 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2330 {
2331         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2332
2333
2334         /* one more node has responded with recmaster data*/
2335         rmdata->count--;
2336
2337         /* if we failed to get the recmaster, then return an error and let
2338            the main loop try again.
2339         */
2340         if (state->state != CTDB_CONTROL_DONE) {
2341                 if (rmdata->status == MONITOR_OK) {
2342                         rmdata->status = MONITOR_FAILED;
2343                 }
2344                 return;
2345         }
2346
2347         /* if we got a response, then the recmaster will be stored in the
2348            status field
2349         */
2350         if (state->status != rmdata->pnn) {
2351                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2352                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2353                 rmdata->status = MONITOR_ELECTION_NEEDED;
2354         }
2355
2356         return;
2357 }
2358
2359
2360 /* verify that all nodes agree that we are the recmaster */
2361 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2362 {
2363         struct ctdb_context *ctdb = rec->ctdb;
2364         struct verify_recmaster_data *rmdata;
2365         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2366         struct ctdb_client_control_state *state;
2367         enum monitor_result status;
2368         int j;
2369         
2370         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2371         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2372         rmdata->rec    = rec;
2373         rmdata->count  = 0;
2374         rmdata->pnn    = pnn;
2375         rmdata->status = MONITOR_OK;
2376
2377         /* loop over all active nodes and send an async getrecmaster call to 
2378            them*/
2379         for (j=0; j<nodemap->num; j++) {
2380                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2381                         continue;
2382                 }
2383                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2384                                         CONTROL_TIMEOUT(),
2385                                         nodemap->nodes[j].pnn);
2386                 if (state == NULL) {
2387                         /* we failed to send the control, treat this as 
2388                            an error and try again next iteration
2389                         */                      
2390                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2391                         talloc_free(mem_ctx);
2392                         return MONITOR_FAILED;
2393                 }
2394
2395                 /* set up the callback functions */
2396                 state->async.fn = verify_recmaster_callback;
2397                 state->async.private_data = rmdata;
2398
2399                 /* one more control to wait for to complete */
2400                 rmdata->count++;
2401         }
2402
2403
2404         /* now wait for up to the maximum number of seconds allowed
2405            or until all nodes we expect a response from has replied
2406         */
2407         while (rmdata->count > 0) {
2408                 event_loop_once(ctdb->ev);
2409         }
2410
2411         status = rmdata->status;
2412         talloc_free(mem_ctx);
2413         return status;
2414 }
2415
2416
2417 /* called to check that the allocation of public ip addresses is ok.
2418 */
2419 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2420 {
2421         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2422         struct ctdb_control_get_ifaces *ifaces = NULL;
2423         struct ctdb_all_public_ips *ips = NULL;
2424         struct ctdb_uptime *uptime1 = NULL;
2425         struct ctdb_uptime *uptime2 = NULL;
2426         int ret, j;
2427         bool need_iface_check = false;
2428         bool need_takeover_run = false;
2429
2430         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2431                                 CTDB_CURRENT_NODE, &uptime1);
2432         if (ret != 0) {
2433                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2434                 talloc_free(mem_ctx);
2435                 return -1;
2436         }
2437
2438
2439         /* read the interfaces from the local node */
2440         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2441         if (ret != 0) {
2442                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2443                 talloc_free(mem_ctx);
2444                 return -1;
2445         }
2446
2447         if (!rec->ifaces) {
2448                 need_iface_check = true;
2449         } else if (rec->ifaces->num != ifaces->num) {
2450                 need_iface_check = true;
2451         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2452                 need_iface_check = true;
2453         }
2454
2455         if (need_iface_check) {
2456                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2457                                      "local node %u - force takeover run\n",
2458                                      pnn));
2459                 need_takeover_run = true;
2460         }
2461
2462         /* read the ip allocation from the local node */
2463         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2464         if (ret != 0) {
2465                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2466                 talloc_free(mem_ctx);
2467                 return -1;
2468         }
2469
2470         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2471                                 CTDB_CURRENT_NODE, &uptime2);
2472         if (ret != 0) {
2473                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2474                 talloc_free(mem_ctx);
2475                 return -1;
2476         }
2477
2478         /* skip the check if the startrecovery time has changed */
2479         if (timeval_compare(&uptime1->last_recovery_started,
2480                             &uptime2->last_recovery_started) != 0) {
2481                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2482                 talloc_free(mem_ctx);
2483                 return 0;
2484         }
2485
2486         /* skip the check if the endrecovery time has changed */
2487         if (timeval_compare(&uptime1->last_recovery_finished,
2488                             &uptime2->last_recovery_finished) != 0) {
2489                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2490                 talloc_free(mem_ctx);
2491                 return 0;
2492         }
2493
2494         /* skip the check if we have started but not finished recovery */
2495         if (timeval_compare(&uptime1->last_recovery_finished,
2496                             &uptime1->last_recovery_started) != 1) {
2497                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2498                 talloc_free(mem_ctx);
2499
2500                 return 0;
2501         }
2502
2503         talloc_free(rec->ifaces);
2504         rec->ifaces = talloc_steal(rec, ifaces);
2505
2506         /* verify that we have the ip addresses we should have
2507            and we dont have ones we shouldnt have.
2508            if we find an inconsistency we set recmode to
2509            active on the local node and wait for the recmaster
2510            to do a full blown recovery
2511         */
2512         for (j=0; j<ips->num; j++) {
2513                 if (ips->ips[j].pnn == pnn) {
2514                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2515                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2516                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2517                                 need_takeover_run = true;
2518                         }
2519                 } else {
2520                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2521                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2522                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2523                                 need_takeover_run = true;
2524                         }
2525                 }
2526         }
2527
2528         if (need_takeover_run) {
2529                 struct takeover_run_reply rd;
2530                 TDB_DATA data;
2531
2532                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2533
2534                 rd.pnn = ctdb->pnn;
2535                 rd.srvid = 0;
2536                 data.dptr = (uint8_t *)&rd;
2537                 data.dsize = sizeof(rd);
2538
2539                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2540                 if (ret != 0) {
2541                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2542                 }
2543         }
2544         talloc_free(mem_ctx);
2545         return 0;
2546 }
2547
2548
2549 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2550 {
2551         struct ctdb_node_map **remote_nodemaps = callback_data;
2552
2553         if (node_pnn >= ctdb->num_nodes) {
2554                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2555                 return;
2556         }
2557
2558         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2559
2560 }
2561
2562 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2563         struct ctdb_node_map *nodemap,
2564         struct ctdb_node_map **remote_nodemaps)
2565 {
2566         uint32_t *nodes;
2567
2568         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2569         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2570                                         nodes, 0,
2571                                         CONTROL_TIMEOUT(), false, tdb_null,
2572                                         async_getnodemap_callback,
2573                                         NULL,
2574                                         remote_nodemaps) != 0) {
2575                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2576
2577                 return -1;
2578         }
2579
2580         return 0;
2581 }
2582
2583 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2584 struct ctdb_check_reclock_state {
2585         struct ctdb_context *ctdb;
2586         struct timeval start_time;
2587         int fd[2];
2588         pid_t child;
2589         struct timed_event *te;
2590         struct fd_event *fde;
2591         enum reclock_child_status status;
2592 };
2593
2594 /* when we free the reclock state we must kill any child process.
2595 */
2596 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2597 {
2598         struct ctdb_context *ctdb = state->ctdb;
2599
2600         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2601
2602         if (state->fd[0] != -1) {
2603                 close(state->fd[0]);
2604                 state->fd[0] = -1;
2605         }
2606         if (state->fd[1] != -1) {
2607                 close(state->fd[1]);
2608                 state->fd[1] = -1;
2609         }
2610         kill(state->child, SIGKILL);
2611         return 0;
2612 }
2613
2614 /*
2615   called if our check_reclock child times out. this would happen if
2616   i/o to the reclock file blocks.
2617  */
2618 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2619                                          struct timeval t, void *private_data)
2620 {
2621         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2622                                            struct ctdb_check_reclock_state);
2623
2624         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2625         state->status = RECLOCK_TIMEOUT;
2626 }
2627
2628 /* this is called when the child process has completed checking the reclock
2629    file and has written data back to us through the pipe.
2630 */
2631 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2632                              uint16_t flags, void *private_data)
2633 {
2634         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2635                                              struct ctdb_check_reclock_state);
2636         char c = 0;
2637         int ret;
2638
2639         /* we got a response from our child process so we can abort the
2640            timeout.
2641         */
2642         talloc_free(state->te);
2643         state->te = NULL;
2644
2645         ret = read(state->fd[0], &c, 1);
2646         if (ret != 1 || c != RECLOCK_OK) {
2647                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2648                 state->status = RECLOCK_FAILED;
2649
2650                 return;
2651         }
2652
2653         state->status = RECLOCK_OK;
2654         return;
2655 }
2656
2657 static int check_recovery_lock(struct ctdb_context *ctdb)
2658 {
2659         int ret;
2660         struct ctdb_check_reclock_state *state;
2661         pid_t parent = getpid();
2662
2663         if (ctdb->recovery_lock_fd == -1) {
2664                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2665                 return -1;
2666         }
2667
2668         state = talloc(ctdb, struct ctdb_check_reclock_state);
2669         CTDB_NO_MEMORY(ctdb, state);
2670
2671         state->ctdb = ctdb;
2672         state->start_time = timeval_current();
2673         state->status = RECLOCK_CHECKING;
2674         state->fd[0] = -1;
2675         state->fd[1] = -1;
2676
2677         ret = pipe(state->fd);
2678         if (ret != 0) {
2679                 talloc_free(state);
2680                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2681                 return -1;
2682         }
2683
2684         state->child = fork();
2685         if (state->child == (pid_t)-1) {
2686                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2687                 close(state->fd[0]);
2688                 state->fd[0] = -1;
2689                 close(state->fd[1]);
2690                 state->fd[1] = -1;
2691                 talloc_free(state);
2692                 return -1;
2693         }
2694
2695         if (state->child == 0) {
2696                 char cc = RECLOCK_OK;
2697                 close(state->fd[0]);
2698                 state->fd[0] = -1;
2699
2700                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2701                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2702                         cc = RECLOCK_FAILED;
2703                 }
2704
2705                 write(state->fd[1], &cc, 1);
2706                 /* make sure we die when our parent dies */
2707                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2708                         sleep(5);
2709                         write(state->fd[1], &cc, 1);
2710                 }
2711                 _exit(0);
2712         }
2713         close(state->fd[1]);
2714         state->fd[1] = -1;
2715         set_close_on_exec(state->fd[0]);
2716
2717         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2718
2719         talloc_set_destructor(state, check_reclock_destructor);
2720
2721         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2722                                     ctdb_check_reclock_timeout, state);
2723         if (state->te == NULL) {
2724                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2725                 talloc_free(state);
2726                 return -1;
2727         }
2728
2729         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2730                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2731                                 reclock_child_handler,
2732                                 (void *)state);
2733
2734         if (state->fde == NULL) {
2735                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2736                 talloc_free(state);
2737                 return -1;
2738         }
2739
2740         while (state->status == RECLOCK_CHECKING) {
2741                 event_loop_once(ctdb->ev);
2742         }
2743
2744         if (state->status == RECLOCK_FAILED) {
2745                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2746                 close(ctdb->recovery_lock_fd);
2747                 ctdb->recovery_lock_fd = -1;
2748                 talloc_free(state);
2749                 return -1;
2750         }
2751
2752         talloc_free(state);
2753         return 0;
2754 }
2755
2756 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2757 {
2758         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2759         const char *reclockfile;
2760
2761         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2762                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2763                 talloc_free(tmp_ctx);
2764                 return -1;      
2765         }
2766
2767         if (reclockfile == NULL) {
2768                 if (ctdb->recovery_lock_file != NULL) {
2769                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2770                         talloc_free(ctdb->recovery_lock_file);
2771                         ctdb->recovery_lock_file = NULL;
2772                         if (ctdb->recovery_lock_fd != -1) {
2773                                 close(ctdb->recovery_lock_fd);
2774                                 ctdb->recovery_lock_fd = -1;
2775                         }
2776                 }
2777                 ctdb->tunable.verify_recovery_lock = 0;
2778                 talloc_free(tmp_ctx);
2779                 return 0;
2780         }
2781
2782         if (ctdb->recovery_lock_file == NULL) {
2783                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2784                 if (ctdb->recovery_lock_fd != -1) {
2785                         close(ctdb->recovery_lock_fd);
2786                         ctdb->recovery_lock_fd = -1;
2787                 }
2788                 talloc_free(tmp_ctx);
2789                 return 0;
2790         }
2791
2792
2793         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2794                 talloc_free(tmp_ctx);
2795                 return 0;
2796         }
2797
2798         talloc_free(ctdb->recovery_lock_file);
2799         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2800         ctdb->tunable.verify_recovery_lock = 0;
2801         if (ctdb->recovery_lock_fd != -1) {
2802                 close(ctdb->recovery_lock_fd);
2803                 ctdb->recovery_lock_fd = -1;
2804         }
2805
2806         talloc_free(tmp_ctx);
2807         return 0;
2808 }
2809                 
2810 /*
2811   the main monitoring loop
2812  */
2813 static void monitor_cluster(struct ctdb_context *ctdb)
2814 {
2815         uint32_t pnn;
2816         TALLOC_CTX *mem_ctx=NULL;
2817         struct ctdb_node_map *nodemap=NULL;
2818         struct ctdb_node_map *recmaster_nodemap=NULL;
2819         struct ctdb_node_map **remote_nodemaps=NULL;
2820         struct ctdb_vnn_map *vnnmap=NULL;
2821         struct ctdb_vnn_map *remote_vnnmap=NULL;
2822         int32_t debug_level;
2823         int i, j, ret;
2824         struct ctdb_recoverd *rec;
2825
2826         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2827
2828         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2829         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2830
2831         rec->ctdb = ctdb;
2832
2833         rec->priority_time = timeval_current();
2834
2835         /* register a message port for sending memory dumps */
2836         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2837
2838         /* register a message port for recovery elections */
2839         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2840
2841         /* when nodes are disabled/enabled */
2842         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2843
2844         /* when we are asked to puch out a flag change */
2845         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2846
2847         /* register a message port for vacuum fetch */
2848         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2849
2850         /* register a message port for reloadnodes  */
2851         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2852
2853         /* register a message port for performing a takeover run */
2854         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2855
2856         /* register a message port for disabling the ip check for a short while */
2857         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2858
2859 again:
2860         if (mem_ctx) {
2861                 talloc_free(mem_ctx);
2862                 mem_ctx = NULL;
2863         }
2864         mem_ctx = talloc_new(ctdb);
2865         if (!mem_ctx) {
2866                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2867                 exit(-1);
2868         }
2869
2870         /* we only check for recovery once every second */
2871         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2872
2873         /* verify that the main daemon is still running */
2874         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2875                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2876                 exit(-1);
2877         }
2878
2879         /* ping the local daemon to tell it we are alive */
2880         ctdb_ctrl_recd_ping(ctdb);
2881
2882         if (rec->election_timeout) {
2883                 /* an election is in progress */
2884                 goto again;
2885         }
2886
2887         /* read the debug level from the parent and update locally */
2888         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2889         if (ret !=0) {
2890                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2891                 goto again;
2892         }
2893         LogLevel = debug_level;
2894
2895
2896         /* We must check if we need to ban a node here but we want to do this
2897            as early as possible so we dont wait until we have pulled the node
2898            map from the local node. thats why we have the hardcoded value 20
2899         */
2900         for (i=0; i<ctdb->num_nodes; i++) {
2901                 struct ctdb_banning_state *ban_state;
2902
2903                 if (ctdb->nodes[i]->ban_state == NULL) {
2904                         continue;
2905                 }
2906                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2907                 if (ban_state->count < 20) {
2908                         continue;
2909                 }
2910                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2911                         ctdb->nodes[i]->pnn, ban_state->count,
2912                         ctdb->tunable.recovery_ban_period));
2913                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2914                 ban_state->count = 0;
2915         }
2916
2917         /* get relevant tunables */
2918         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2919         if (ret != 0) {
2920                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2921                 goto again;
2922         }
2923
2924         /* get the current recovery lock file from the server */
2925         if (update_recovery_lock_file(ctdb) != 0) {
2926                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2927                 goto again;
2928         }
2929
2930         /* Make sure that if recovery lock verification becomes disabled when
2931            we close the file
2932         */
2933         if (ctdb->tunable.verify_recovery_lock == 0) {
2934                 if (ctdb->recovery_lock_fd != -1) {
2935                         close(ctdb->recovery_lock_fd);
2936                         ctdb->recovery_lock_fd = -1;
2937                 }
2938         }
2939
2940         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2941         if (pnn == (uint32_t)-1) {
2942                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2943                 goto again;
2944         }
2945
2946         /* get the vnnmap */
2947         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2948         if (ret != 0) {
2949                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2950                 goto again;
2951         }
2952
2953
2954         /* get number of nodes */
2955         if (rec->nodemap) {
2956                 talloc_free(rec->nodemap);
2957                 rec->nodemap = NULL;
2958                 nodemap=NULL;
2959         }
2960         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2961         if (ret != 0) {
2962                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2963                 goto again;
2964         }
2965         nodemap = rec->nodemap;
2966
2967         /* check which node is the recovery master */
2968         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2969         if (ret != 0) {
2970                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2971                 goto again;
2972         }
2973
2974         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2975         if (rec->recmaster != pnn) {
2976                 if (rec->ip_reallocate_ctx != NULL) {
2977                         talloc_free(rec->ip_reallocate_ctx);
2978                         rec->ip_reallocate_ctx = NULL;
2979                         rec->reallocate_callers = NULL;
2980                 }
2981         }
2982         /* if there are takeovers requested, perform it and notify the waiters */
2983         if (rec->reallocate_callers) {
2984                 process_ipreallocate_requests(ctdb, rec);
2985         }
2986
2987         if (rec->recmaster == (uint32_t)-1) {
2988                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2989                 force_election(rec, pnn, nodemap);
2990                 goto again;
2991         }
2992
2993
2994         /* if the local daemon is STOPPED, we verify that the databases are
2995            also frozen and thet the recmode is set to active 
2996         */
2997         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2998                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2999                 if (ret != 0) {
3000                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3001                 }
3002                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3003                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3004
3005                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3006                         if (ret != 0) {
3007                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3008                                 goto again;
3009                         }
3010                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3011                         if (ret != 0) {
3012                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3013
3014                                 goto again;
3015                         }
3016                         goto again;
3017                 }
3018         }
3019         /* If the local node is stopped, verify we are not the recmaster 
3020            and yield this role if so
3021         */
3022         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3023                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3024                 force_election(rec, pnn, nodemap);
3025                 goto again;
3026         }
3027         
3028         /* check that we (recovery daemon) and the local ctdb daemon
3029            agrees on whether we are banned or not
3030         */
3031 //qqq
3032
3033         /* remember our own node flags */
3034         rec->node_flags = nodemap->nodes[pnn].flags;
3035
3036         /* count how many active nodes there are */
3037         rec->num_active    = 0;
3038         rec->num_connected = 0;
3039         for (i=0; i<nodemap->num; i++) {
3040                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3041                         rec->num_active++;
3042                 }
3043                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3044                         rec->num_connected++;
3045                 }
3046         }
3047
3048
3049         /* verify that the recmaster node is still active */
3050         for (j=0; j<nodemap->num; j++) {
3051                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3052                         break;
3053                 }
3054         }
3055
3056         if (j == nodemap->num) {
3057                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3058                 force_election(rec, pnn, nodemap);
3059                 goto again;
3060         }
3061
3062         /* if recovery master is disconnected we must elect a new recmaster */
3063         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3064                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3065                 force_election(rec, pnn, nodemap);
3066                 goto again;
3067         }
3068
3069         /* grap the nodemap from the recovery master to check if it is banned */
3070         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3071                                    mem_ctx, &recmaster_nodemap);
3072         if (ret != 0) {
3073                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3074                           nodemap->nodes[j].pnn));
3075                 goto again;
3076         }
3077
3078
3079         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3080                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3081                 force_election(rec, pnn, nodemap);
3082                 goto again;
3083         }
3084
3085
3086         /* verify that we have all ip addresses we should have and we dont
3087          * have addresses we shouldnt have.
3088          */ 
3089         if (ctdb->do_checkpublicip) {
3090                 if (rec->ip_check_disable_ctx == NULL) {
3091                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
3092                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3093                         }
3094                 }
3095         }
3096
3097
3098         /* if we are not the recmaster then we do not need to check
3099            if recovery is needed
3100          */
3101         if (pnn != rec->recmaster) {
3102                 goto again;
3103         }
3104
3105
3106         /* ensure our local copies of flags are right */
3107         ret = update_local_flags(rec, nodemap);
3108         if (ret == MONITOR_ELECTION_NEEDED) {
3109                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3110                 force_election(rec, pnn, nodemap);
3111                 goto again;
3112         }
3113         if (ret != MONITOR_OK) {
3114                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3115                 goto again;
3116         }
3117
3118         if (ctdb->num_nodes != nodemap->num) {
3119                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3120                 reload_nodes_file(ctdb);
3121                 goto again;
3122         }
3123
3124         /* verify that all active nodes agree that we are the recmaster */
3125         switch (verify_recmaster(rec, nodemap, pnn)) {
3126         case MONITOR_RECOVERY_NEEDED:
3127                 /* can not happen */
3128                 goto again;
3129         case MONITOR_ELECTION_NEEDED:
3130                 force_election(rec, pnn, nodemap);
3131                 goto again;
3132         case MONITOR_OK:
3133                 break;
3134         case MONITOR_FAILED:
3135                 goto again;
3136         }
3137
3138
3139         if (rec->need_recovery) {
3140                 /* a previous recovery didn't finish */
3141                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3142                 goto again;             
3143         }
3144
3145         /* verify that all active nodes are in normal mode 
3146            and not in recovery mode 
3147         */
3148         switch (verify_recmode(ctdb, nodemap)) {
3149         case MONITOR_RECOVERY_NEEDED:
3150                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3151                 goto again;
3152         case MONITOR_FAILED:
3153                 goto again;
3154         case MONITOR_ELECTION_NEEDED:
3155                 /* can not happen */
3156         case MONITOR_OK:
3157                 break;
3158         }
3159
3160
3161         if (ctdb->tunable.verify_recovery_lock != 0) {
3162                 /* we should have the reclock - check its not stale */
3163                 ret = check_recovery_lock(ctdb);
3164                 if (ret != 0) {
3165                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3166                         ctdb_set_culprit(rec, ctdb->pnn);
3167                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3168                         goto again;
3169                 }
3170         }
3171
3172         /* get the nodemap for all active remote nodes
3173          */
3174         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3175         if (remote_nodemaps == NULL) {
3176                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3177                 goto again;
3178         }
3179         for(i=0; i<nodemap->num; i++) {
3180                 remote_nodemaps[i] = NULL;
3181         }
3182         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3183                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3184                 goto again;
3185         } 
3186
3187         /* verify that all other nodes have the same nodemap as we have
3188         */
3189         for (j=0; j<nodemap->num; j++) {
3190                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3191                         continue;
3192                 }
3193
3194                 if (remote_nodemaps[j] == NULL) {
3195                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3196                         ctdb_set_culprit(rec, j);
3197
3198                         goto again;
3199                 }
3200
3201                 /* if the nodes disagree on how many nodes there are
3202                    then this is a good reason to try recovery
3203                  */
3204                 if (remote_nodemaps[j]->num != nodemap->num) {
3205                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3206                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3207                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3208                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3209                         goto again;
3210                 }
3211
3212                 /* if the nodes disagree on which nodes exist and are
3213                    active, then that is also a good reason to do recovery
3214                  */
3215                 for (i=0;i<nodemap->num;i++) {
3216                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3217                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3218                                           nodemap->nodes[j].pnn, i, 
3219                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3220                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3221                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3222                                             vnnmap);
3223                                 goto again;
3224                         }
3225                 }
3226
3227                 /* verify the flags are consistent
3228                 */
3229                 for (i=0; i<nodemap->num; i++) {
3230                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3231                                 continue;
3232                         }
3233                         
3234                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3235                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3236                                   nodemap->nodes[j].pnn, 
3237                                   nodemap->nodes[i].pnn, 
3238                                   remote_nodemaps[j]->nodes[i].flags,
3239                                   nodemap->nodes[j].flags));
3240                                 if (i == j) {
3241                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3242                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3243                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3244                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3245                                                     vnnmap);
3246                                         goto again;
3247                                 } else {
3248                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3249                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3250                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3251                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3252                                                     vnnmap);
3253                                         goto again;
3254                                 }
3255                         }
3256                 }
3257         }
3258
3259
3260         /* there better be the same number of lmasters in the vnn map
3261            as there are active nodes or we will have to do a recovery
3262          */
3263         if (vnnmap->size != rec->num_active) {
3264                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3265                           vnnmap->size, rec->num_active));
3266                 ctdb_set_culprit(rec, ctdb->pnn);
3267                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3268                 goto again;
3269         }
3270
3271         /* verify that all active nodes in the nodemap also exist in 
3272            the vnnmap.
3273          */
3274         for (j=0; j<nodemap->num; j++) {
3275                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3276                         continue;
3277                 }
3278                 if (nodemap->nodes[j].pnn == pnn) {
3279                         continue;
3280                 }
3281
3282                 for (i=0; i<vnnmap->size; i++) {
3283                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3284                                 break;
3285                         }
3286                 }
3287                 if (i == vnnmap->size) {
3288                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3289                                   nodemap->nodes[j].pnn));
3290                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3291                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3292                         goto again;
3293                 }
3294         }
3295
3296         
3297         /* verify that all other nodes have the same vnnmap
3298            and are from the same generation
3299          */
3300         for (j=0; j<nodemap->num; j++) {
3301                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3302                         continue;
3303                 }
3304                 if (nodemap->nodes[j].pnn == pnn) {
3305                         continue;
3306                 }
3307
3308                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3309                                           mem_ctx, &remote_vnnmap);
3310                 if (ret != 0) {
3311                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3312                                   nodemap->nodes[j].pnn));
3313                         goto again;
3314                 }
3315
3316                 /* verify the vnnmap generation is the same */
3317                 if (vnnmap->generation != remote_vnnmap->generation) {
3318                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3319                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3320                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3321                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3322                         goto again;
3323                 }
3324
3325                 /* verify the vnnmap size is the same */
3326                 if (vnnmap->size != remote_vnnmap->size) {
3327                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3328                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3329                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3330                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3331                         goto again;
3332                 }
3333
3334                 /* verify the vnnmap is the same */
3335                 for (i=0;i<vnnmap->size;i++) {
3336                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3337                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3338                                           nodemap->nodes[j].pnn));
3339                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3340                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3341                                             vnnmap);
3342                                 goto again;
3343                         }
3344                 }
3345         }
3346
3347         /* we might need to change who has what IP assigned */
3348         if (rec->need_takeover_run) {
3349                 uint32_t culprit = (uint32_t)-1;
3350
3351                 rec->need_takeover_run = false;
3352
3353                 /* update the list of public ips that a node can handle for
3354                    all connected nodes
3355                 */
3356                 ret = ctdb_reload_remote_public_ips(ctdb, nodemap, &culprit);
3357                 if (ret != 0) {
3358                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3359                                          culprit));
3360                         ctdb_set_culprit(rec, culprit);
3361                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3362                         goto again;
3363                 }
3364
3365                 /* execute the "startrecovery" event script on all nodes */
3366                 ret = run_startrecovery_eventscript(rec, nodemap);
3367                 if (ret!=0) {
3368                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3369                         ctdb_set_culprit(rec, ctdb->pnn);
3370                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3371                         goto again;
3372                 }
3373
3374                 ret = ctdb_takeover_run(ctdb, nodemap);
3375                 if (ret != 0) {
3376                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3377                         ctdb_set_culprit(rec, ctdb->pnn);
3378                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3379                         goto again;
3380                 }
3381
3382                 /* execute the "recovered" event script on all nodes */
3383                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3384 #if 0
3385 // we cant check whether the event completed successfully
3386 // since this script WILL fail if the node is in recovery mode
3387 // and if that race happens, the code here would just cause a second
3388 // cascading recovery.
3389                 if (ret!=0) {
3390                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3391                         ctdb_set_culprit(rec, ctdb->pnn);
3392                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3393                 }
3394 #endif
3395         }
3396
3397
3398         goto again;
3399
3400 }
3401
3402 /*
3403   event handler for when the main ctdbd dies
3404  */
3405 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3406                                  uint16_t flags, void *private_data)
3407 {
3408         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3409         _exit(1);
3410 }
3411
3412 /*
3413   called regularly to verify that the recovery daemon is still running
3414  */
3415 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3416                               struct timeval yt, void *p)
3417 {
3418         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3419
3420         if (kill(ctdb->recoverd_pid, 0) != 0) {
3421                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3422
3423                 ctdb_stop_recoverd(ctdb);
3424                 ctdb_stop_keepalive(ctdb);
3425                 ctdb_stop_monitoring(ctdb);
3426                 ctdb_release_all_ips(ctdb);
3427                 if (ctdb->methods != NULL) {
3428                         ctdb->methods->shutdown(ctdb);
3429                 }
3430                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3431
3432                 exit(10);       
3433         }
3434
3435         event_add_timed(ctdb->ev, ctdb, 
3436                         timeval_current_ofs(30, 0),
3437                         ctdb_check_recd, ctdb);
3438 }
3439
3440 static void recd_sig_child_handler(struct event_context *ev,
3441         struct signal_event *se, int signum, int count,
3442         void *dont_care, 
3443         void *private_data)
3444 {
3445 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3446         int status;
3447         pid_t pid = -1;
3448
3449         while (pid != 0) {
3450                 pid = waitpid(-1, &status, WNOHANG);
3451                 if (pid == -1) {
3452                         if (errno != ECHILD) {
3453                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3454                         }
3455                         return;
3456                 }
3457                 if (pid > 0) {
3458                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3459                 }
3460         }
3461 }
3462
3463 /*
3464   startup the recovery daemon as a child of the main ctdb daemon
3465  */
3466 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3467 {
3468         int fd[2];
3469         struct signal_event *se;
3470
3471         if (pipe(fd) != 0) {
3472                 return -1;
3473         }
3474
3475         ctdb->ctdbd_pid = getpid();
3476
3477         ctdb->recoverd_pid = fork();
3478         if (ctdb->recoverd_pid == -1) {
3479                 return -1;
3480         }
3481         
3482         if (ctdb->recoverd_pid != 0) {
3483                 close(fd[0]);
3484                 event_add_timed(ctdb->ev, ctdb, 
3485                                 timeval_current_ofs(30, 0),
3486                                 ctdb_check_recd, ctdb);
3487                 return 0;
3488         }
3489
3490         close(fd[1]);
3491
3492         srandom(getpid() ^ time(NULL));
3493
3494         if (switch_from_server_to_client(ctdb) != 0) {
3495                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3496                 exit(1);
3497         }
3498
3499         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3500
3501         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3502                      ctdb_recoverd_parent, &fd[0]);     
3503
3504         /* set up a handler to pick up sigchld */
3505         se = event_add_signal(ctdb->ev, ctdb,
3506                                      SIGCHLD, 0,
3507                                      recd_sig_child_handler,
3508                                      ctdb);
3509         if (se == NULL) {
3510                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3511                 exit(1);
3512         }
3513
3514         monitor_cluster(ctdb);
3515
3516         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3517         return -1;
3518 }
3519
3520 /*
3521   shutdown the recovery daemon
3522  */
3523 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3524 {
3525         if (ctdb->recoverd_pid == 0) {
3526                 return;
3527         }
3528
3529         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3530         kill(ctdb->recoverd_pid, SIGTERM);
3531 }