5f34711f866f84d1e509839e19fce597d6cf65e5
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1231                                          struct ctdb_recoverd *rec,
1232                                          struct ctdb_node_map *nodemap,
1233                                          uint32_t *culprit)
1234 {
1235         int j;
1236         int ret;
1237
1238         if (ctdb->num_nodes != nodemap->num) {
1239                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1240                                   ctdb->num_nodes, nodemap->num));
1241                 if (culprit) {
1242                         *culprit = ctdb->pnn;
1243                 }
1244                 return -1;
1245         }
1246
1247         for (j=0; j<nodemap->num; j++) {
1248                 /* release any existing data */
1249                 if (ctdb->nodes[j]->known_public_ips) {
1250                         talloc_free(ctdb->nodes[j]->known_public_ips);
1251                         ctdb->nodes[j]->known_public_ips = NULL;
1252                 }
1253                 if (ctdb->nodes[j]->available_public_ips) {
1254                         talloc_free(ctdb->nodes[j]->available_public_ips);
1255                         ctdb->nodes[j]->available_public_ips = NULL;
1256                 }
1257
1258                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1259                         continue;
1260                 }
1261
1262                 /* grab a new shiny list of public ips from the node */
1263                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1264                                         CONTROL_TIMEOUT(),
1265                                         ctdb->nodes[j]->pnn,
1266                                         ctdb->nodes,
1267                                         0,
1268                                         &ctdb->nodes[j]->known_public_ips);
1269                 if (ret != 0) {
1270                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1271                                 ctdb->nodes[j]->pnn));
1272                         if (culprit) {
1273                                 *culprit = ctdb->nodes[j]->pnn;
1274                         }
1275                         return -1;
1276                 }
1277
1278                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1279                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1280                         rec->need_takeover_run = true;
1281                 }
1282
1283                 /* grab a new shiny list of public ips from the node */
1284                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1285                                         CONTROL_TIMEOUT(),
1286                                         ctdb->nodes[j]->pnn,
1287                                         ctdb->nodes,
1288                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1289                                         &ctdb->nodes[j]->available_public_ips);
1290                 if (ret != 0) {
1291                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1292                                 ctdb->nodes[j]->pnn));
1293                         if (culprit) {
1294                                 *culprit = ctdb->nodes[j]->pnn;
1295                         }
1296                         return -1;
1297                 }
1298         }
1299
1300         return 0;
1301 }
1302
1303 /*
1304   we are the recmaster, and recovery is needed - start a recovery run
1305  */
1306 static int do_recovery(struct ctdb_recoverd *rec, 
1307                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1308                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1309 {
1310         struct ctdb_context *ctdb = rec->ctdb;
1311         int i, j, ret;
1312         uint32_t generation;
1313         struct ctdb_dbid_map *dbmap;
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316         struct timeval start_time;
1317         uint32_t culprit = (uint32_t)-1;
1318
1319         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1320
1321         /* if recovery fails, force it again */
1322         rec->need_recovery = true;
1323
1324         for (i=0; i<ctdb->num_nodes; i++) {
1325                 struct ctdb_banning_state *ban_state;
1326
1327                 if (ctdb->nodes[i]->ban_state == NULL) {
1328                         continue;
1329                 }
1330                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1331                 if (ban_state->count < 2*ctdb->num_nodes) {
1332                         continue;
1333                 }
1334                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1335                         ctdb->nodes[i]->pnn, ban_state->count,
1336                         ctdb->tunable.recovery_ban_period));
1337                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1338                 ban_state->count = 0;
1339         }
1340
1341
1342         if (ctdb->tunable.verify_recovery_lock != 0) {
1343                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1344                 start_time = timeval_current();
1345                 if (!ctdb_recovery_lock(ctdb, true)) {
1346                         ctdb_set_culprit(rec, pnn);
1347                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1348                         return -1;
1349                 }
1350                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1351                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1352         }
1353
1354         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1355
1356         /* get a list of all databases */
1357         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1358         if (ret != 0) {
1359                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1360                 return -1;
1361         }
1362
1363         /* we do the db creation before we set the recovery mode, so the freeze happens
1364            on all databases we will be dealing with. */
1365
1366         /* verify that we have all the databases any other node has */
1367         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1368         if (ret != 0) {
1369                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1370                 return -1;
1371         }
1372
1373         /* verify that all other nodes have all our databases */
1374         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1375         if (ret != 0) {
1376                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1377                 return -1;
1378         }
1379         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1380
1381         /* update the database priority for all remote databases */
1382         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1383         if (ret != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1385         }
1386         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1387
1388
1389         /* set recovery mode to active on all nodes */
1390         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1391         if (ret != 0) {
1392                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1393                 return -1;
1394         }
1395
1396         /* execute the "startrecovery" event script on all nodes */
1397         ret = run_startrecovery_eventscript(rec, nodemap);
1398         if (ret!=0) {
1399                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1400                 return -1;
1401         }
1402
1403         /*
1404           update all nodes to have the same flags that we have
1405          */
1406         for (i=0;i<nodemap->num;i++) {
1407                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1408                         continue;
1409                 }
1410
1411                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1412                 if (ret != 0) {
1413                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1414                         return -1;
1415                 }
1416         }
1417
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1419
1420         /* pick a new generation number */
1421         generation = new_generation();
1422
1423         /* change the vnnmap on this node to use the new generation 
1424            number but not on any other nodes.
1425            this guarantees that if we abort the recovery prematurely
1426            for some reason (a node stops responding?)
1427            that we can just return immediately and we will reenter
1428            recovery shortly again.
1429            I.e. we deliberately leave the cluster with an inconsistent
1430            generation id to allow us to abort recovery at any stage and
1431            just restart it from scratch.
1432          */
1433         vnnmap->generation = generation;
1434         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1437                 return -1;
1438         }
1439
1440         data.dptr = (void *)&generation;
1441         data.dsize = sizeof(uint32_t);
1442
1443         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1444         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1445                                         nodes, 0,
1446                                         CONTROL_TIMEOUT(), false, data,
1447                                         NULL,
1448                                         transaction_start_fail_callback,
1449                                         rec) != 0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1451                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1452                                         nodes, 0,
1453                                         CONTROL_TIMEOUT(), false, tdb_null,
1454                                         NULL,
1455                                         NULL,
1456                                         NULL) != 0) {
1457                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1458                 }
1459                 return -1;
1460         }
1461
1462         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1463
1464         for (i=0;i<dbmap->num;i++) {
1465                 ret = recover_database(rec, mem_ctx,
1466                                        dbmap->dbs[i].dbid,
1467                                        dbmap->dbs[i].persistent,
1468                                        pnn, nodemap, generation);
1469                 if (ret != 0) {
1470                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1471                         return -1;
1472                 }
1473         }
1474
1475         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1476
1477         /* commit all the changes */
1478         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1479                                         nodes, 0,
1480                                         CONTROL_TIMEOUT(), false, data,
1481                                         NULL, NULL,
1482                                         NULL) != 0) {
1483                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1484                 return -1;
1485         }
1486
1487         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1488         
1489
1490         /* update the capabilities for all nodes */
1491         ret = update_capabilities(ctdb, nodemap);
1492         if (ret!=0) {
1493                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1494                 return -1;
1495         }
1496
1497         /* build a new vnn map with all the currently active and
1498            unbanned nodes */
1499         generation = new_generation();
1500         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1501         CTDB_NO_MEMORY(ctdb, vnnmap);
1502         vnnmap->generation = generation;
1503         vnnmap->size = 0;
1504         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1505         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1506         for (i=j=0;i<nodemap->num;i++) {
1507                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1508                         continue;
1509                 }
1510                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1511                         /* this node can not be an lmaster */
1512                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1513                         continue;
1514                 }
1515
1516                 vnnmap->size++;
1517                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1518                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1519                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1520
1521         }
1522         if (vnnmap->size == 0) {
1523                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1524                 vnnmap->size++;
1525                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1526                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1527                 vnnmap->map[0] = pnn;
1528         }       
1529
1530         /* update to the new vnnmap on all nodes */
1531         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1532         if (ret != 0) {
1533                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1534                 return -1;
1535         }
1536
1537         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1538
1539         /* update recmaster to point to us for all nodes */
1540         ret = set_recovery_master(ctdb, nodemap, pnn);
1541         if (ret!=0) {
1542                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1543                 return -1;
1544         }
1545
1546         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1547
1548         /*
1549           update all nodes to have the same flags that we have
1550          */
1551         for (i=0;i<nodemap->num;i++) {
1552                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1553                         continue;
1554                 }
1555
1556                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1557                 if (ret != 0) {
1558                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1559                         return -1;
1560                 }
1561         }
1562
1563         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1564
1565         /* disable recovery mode */
1566         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1569                 return -1;
1570         }
1571
1572         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1573
1574         /*
1575           tell nodes to takeover their public IPs
1576          */
1577         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1580                                  culprit));
1581                 return -1;
1582         }
1583         rec->need_takeover_run = false;
1584         ret = ctdb_takeover_run(ctdb, nodemap);
1585         if (ret != 0) {
1586                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1587                 return -1;
1588         }
1589         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1590
1591         /* execute the "recovered" event script on all nodes */
1592         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1593         if (ret!=0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1595                 return -1;
1596         }
1597
1598         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1599
1600         /* send a message to all clients telling them that the cluster 
1601            has been reconfigured */
1602         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1603
1604         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1605
1606         rec->need_recovery = false;
1607
1608         /* we managed to complete a full recovery, make sure to forgive
1609            any past sins by the nodes that could now participate in the
1610            recovery.
1611         */
1612         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1613         for (i=0;i<nodemap->num;i++) {
1614                 struct ctdb_banning_state *ban_state;
1615
1616                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1617                         continue;
1618                 }
1619
1620                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1621                 if (ban_state == NULL) {
1622                         continue;
1623                 }
1624
1625                 ban_state->count = 0;
1626         }
1627
1628
1629         /* We just finished a recovery successfully. 
1630            We now wait for rerecovery_timeout before we allow 
1631            another recovery to take place.
1632         */
1633         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1634         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1635         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1636
1637         return 0;
1638 }
1639
1640
1641 /*
1642   elections are won by first checking the number of connected nodes, then
1643   the priority time, then the pnn
1644  */
1645 struct election_message {
1646         uint32_t num_connected;
1647         struct timeval priority_time;
1648         uint32_t pnn;
1649         uint32_t node_flags;
1650 };
1651
1652 /*
1653   form this nodes election data
1654  */
1655 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1656 {
1657         int ret, i;
1658         struct ctdb_node_map *nodemap;
1659         struct ctdb_context *ctdb = rec->ctdb;
1660
1661         ZERO_STRUCTP(em);
1662
1663         em->pnn = rec->ctdb->pnn;
1664         em->priority_time = rec->priority_time;
1665
1666         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1667         if (ret != 0) {
1668                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1669                 return;
1670         }
1671
1672         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1673         em->node_flags = rec->node_flags;
1674
1675         for (i=0;i<nodemap->num;i++) {
1676                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1677                         em->num_connected++;
1678                 }
1679         }
1680
1681         /* we shouldnt try to win this election if we cant be a recmaster */
1682         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1683                 em->num_connected = 0;
1684                 em->priority_time = timeval_current();
1685         }
1686
1687         talloc_free(nodemap);
1688 }
1689
1690 /*
1691   see if the given election data wins
1692  */
1693 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1694 {
1695         struct election_message myem;
1696         int cmp = 0;
1697
1698         ctdb_election_data(rec, &myem);
1699
1700         /* we cant win if we dont have the recmaster capability */
1701         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1702                 return false;
1703         }
1704
1705         /* we cant win if we are banned */
1706         if (rec->node_flags & NODE_FLAGS_BANNED) {
1707                 return false;
1708         }       
1709
1710         /* we cant win if we are stopped */
1711         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1712                 return false;
1713         }       
1714
1715         /* we will automatically win if the other node is banned */
1716         if (em->node_flags & NODE_FLAGS_BANNED) {
1717                 return true;
1718         }
1719
1720         /* we will automatically win if the other node is banned */
1721         if (em->node_flags & NODE_FLAGS_STOPPED) {
1722                 return true;
1723         }
1724
1725         /* try to use the most connected node */
1726         if (cmp == 0) {
1727                 cmp = (int)myem.num_connected - (int)em->num_connected;
1728         }
1729
1730         /* then the longest running node */
1731         if (cmp == 0) {
1732                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1733         }
1734
1735         if (cmp == 0) {
1736                 cmp = (int)myem.pnn - (int)em->pnn;
1737         }
1738
1739         return cmp > 0;
1740 }
1741
1742 /*
1743   send out an election request
1744  */
1745 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1746 {
1747         int ret;
1748         TDB_DATA election_data;
1749         struct election_message emsg;
1750         uint64_t srvid;
1751         struct ctdb_context *ctdb = rec->ctdb;
1752
1753         srvid = CTDB_SRVID_RECOVERY;
1754
1755         ctdb_election_data(rec, &emsg);
1756
1757         election_data.dsize = sizeof(struct election_message);
1758         election_data.dptr  = (unsigned char *)&emsg;
1759
1760
1761         /* send an election message to all active nodes */
1762         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1763         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1764
1765
1766         /* A new node that is already frozen has entered the cluster.
1767            The existing nodes are not frozen and dont need to be frozen
1768            until the election has ended and we start the actual recovery
1769         */
1770         if (update_recmaster == true) {
1771                 /* first we assume we will win the election and set 
1772                    recoverymaster to be ourself on the current node
1773                  */
1774                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1775                 if (ret != 0) {
1776                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1777                         return -1;
1778                 }
1779         }
1780
1781
1782         return 0;
1783 }
1784
1785 /*
1786   this function will unban all nodes in the cluster
1787 */
1788 static void unban_all_nodes(struct ctdb_context *ctdb)
1789 {
1790         int ret, i;
1791         struct ctdb_node_map *nodemap;
1792         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1793         
1794         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1795         if (ret != 0) {
1796                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1797                 return;
1798         }
1799
1800         for (i=0;i<nodemap->num;i++) {
1801                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1802                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1803                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1804                 }
1805         }
1806
1807         talloc_free(tmp_ctx);
1808 }
1809
1810
1811 /*
1812   we think we are winning the election - send a broadcast election request
1813  */
1814 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1815 {
1816         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1817         int ret;
1818
1819         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1820         if (ret != 0) {
1821                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1822         }
1823
1824         talloc_free(rec->send_election_te);
1825         rec->send_election_te = NULL;
1826 }
1827
1828 /*
1829   handler for memory dumps
1830 */
1831 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1832                              TDB_DATA data, void *private_data)
1833 {
1834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1835         TDB_DATA *dump;
1836         int ret;
1837         struct rd_memdump_reply *rd;
1838
1839         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1841                 talloc_free(tmp_ctx);
1842                 return;
1843         }
1844         rd = (struct rd_memdump_reply *)data.dptr;
1845
1846         dump = talloc_zero(tmp_ctx, TDB_DATA);
1847         if (dump == NULL) {
1848                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1849                 talloc_free(tmp_ctx);
1850                 return;
1851         }
1852         ret = ctdb_dump_memory(ctdb, dump);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1855                 talloc_free(tmp_ctx);
1856                 return;
1857         }
1858
1859 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1860
1861         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1862         if (ret != 0) {
1863                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1864                 talloc_free(tmp_ctx);
1865                 return;
1866         }
1867
1868         talloc_free(tmp_ctx);
1869 }
1870
1871 /*
1872   handler for reload_nodes
1873 */
1874 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1875                              TDB_DATA data, void *private_data)
1876 {
1877         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1878
1879         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1880
1881         reload_nodes_file(rec->ctdb);
1882 }
1883
1884
1885 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1886                               struct timeval yt, void *p)
1887 {
1888         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1889
1890         talloc_free(rec->ip_check_disable_ctx);
1891         rec->ip_check_disable_ctx = NULL;
1892 }
1893
1894 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1895                              TDB_DATA data, void *private_data)
1896 {
1897         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1898         uint32_t timeout;
1899
1900         if (rec->ip_check_disable_ctx != NULL) {
1901                 talloc_free(rec->ip_check_disable_ctx);
1902                 rec->ip_check_disable_ctx = NULL;
1903         }
1904
1905         if (data.dsize != sizeof(uint32_t)) {
1906                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1907                                  "expexting %lu\n", (long unsigned)data.dsize,
1908                                  (long unsigned)sizeof(uint32_t)));
1909                 return;
1910         }
1911         if (data.dptr == NULL) {
1912                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1913                 return;
1914         }
1915
1916         timeout = *((uint32_t *)data.dptr);
1917         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1918
1919         rec->ip_check_disable_ctx = talloc_new(rec);
1920         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1921
1922         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1923 }
1924
1925
1926 /*
1927   handler for ip reallocate, just add it to the list of callers and 
1928   handle this later in the monitor_cluster loop so we do not recurse
1929   with other callers to takeover_run()
1930 */
1931 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1932                              TDB_DATA data, void *private_data)
1933 {
1934         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1935         struct ip_reallocate_list *caller;
1936
1937         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1938                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1939                 return;
1940         }
1941
1942         if (rec->ip_reallocate_ctx == NULL) {
1943                 rec->ip_reallocate_ctx = talloc_new(rec);
1944                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1945         }
1946
1947         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1948         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1949
1950         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1951         caller->next = rec->reallocate_callers;
1952         rec->reallocate_callers = caller;
1953
1954         return;
1955 }
1956
1957 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1958 {
1959         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1960         TDB_DATA result;
1961         int32_t ret;
1962         struct ip_reallocate_list *callers;
1963         uint32_t culprit;
1964
1965         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1966
1967         /* update the list of public ips that a node can handle for
1968            all connected nodes
1969         */
1970         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
1971         if (ret != 0) {
1972                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1973                                  culprit));
1974                 rec->need_takeover_run = true;
1975         }
1976         if (ret == 0) {
1977                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
1978                 if (ret != 0) {
1979                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1980                                          culprit));
1981                         rec->need_takeover_run = true;
1982                 }
1983         }
1984
1985         result.dsize = sizeof(int32_t);
1986         result.dptr  = (uint8_t *)&ret;
1987
1988         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1989
1990                 /* Someone that sent srvid==0 does not want a reply */
1991                 if (callers->rd->srvid == 0) {
1992                         continue;
1993                 }
1994                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1995                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1996                                   (unsigned long long)callers->rd->srvid));
1997                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1998                 if (ret != 0) {
1999                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2000                                          "message to %u:%llu\n",
2001                                          (unsigned)callers->rd->pnn,
2002                                          (unsigned long long)callers->rd->srvid));
2003                 }
2004         }
2005
2006         talloc_free(tmp_ctx);
2007         talloc_free(rec->ip_reallocate_ctx);
2008         rec->ip_reallocate_ctx = NULL;
2009         rec->reallocate_callers = NULL;
2010         
2011 }
2012
2013
2014 /*
2015   handler for recovery master elections
2016 */
2017 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2018                              TDB_DATA data, void *private_data)
2019 {
2020         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2021         int ret;
2022         struct election_message *em = (struct election_message *)data.dptr;
2023         TALLOC_CTX *mem_ctx;
2024
2025         /* we got an election packet - update the timeout for the election */
2026         talloc_free(rec->election_timeout);
2027         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2028                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2029                                                 ctdb_election_timeout, rec);
2030
2031         mem_ctx = talloc_new(ctdb);
2032
2033         /* someone called an election. check their election data
2034            and if we disagree and we would rather be the elected node, 
2035            send a new election message to all other nodes
2036          */
2037         if (ctdb_election_win(rec, em)) {
2038                 if (!rec->send_election_te) {
2039                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2040                                                                 timeval_current_ofs(0, 500000),
2041                                                                 election_send_request, rec);
2042                 }
2043                 talloc_free(mem_ctx);
2044                 /*unban_all_nodes(ctdb);*/
2045                 return;
2046         }
2047         
2048         /* we didn't win */
2049         talloc_free(rec->send_election_te);
2050         rec->send_election_te = NULL;
2051
2052         if (ctdb->tunable.verify_recovery_lock != 0) {
2053                 /* release the recmaster lock */
2054                 if (em->pnn != ctdb->pnn &&
2055                     ctdb->recovery_lock_fd != -1) {
2056                         close(ctdb->recovery_lock_fd);
2057                         ctdb->recovery_lock_fd = -1;
2058                         unban_all_nodes(ctdb);
2059                 }
2060         }
2061
2062         /* ok, let that guy become recmaster then */
2063         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2064         if (ret != 0) {
2065                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2066                 talloc_free(mem_ctx);
2067                 return;
2068         }
2069
2070         talloc_free(mem_ctx);
2071         return;
2072 }
2073
2074
2075 /*
2076   force the start of the election process
2077  */
2078 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2079                            struct ctdb_node_map *nodemap)
2080 {
2081         int ret;
2082         struct ctdb_context *ctdb = rec->ctdb;
2083
2084         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2085
2086         /* set all nodes to recovery mode to stop all internode traffic */
2087         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2088         if (ret != 0) {
2089                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2090                 return;
2091         }
2092
2093         talloc_free(rec->election_timeout);
2094         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2095                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2096                                                 ctdb_election_timeout, rec);
2097
2098         ret = send_election_request(rec, pnn, true);
2099         if (ret!=0) {
2100                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2101                 return;
2102         }
2103
2104         /* wait for a few seconds to collect all responses */
2105         ctdb_wait_election(rec);
2106 }
2107
2108
2109
2110 /*
2111   handler for when a node changes its flags
2112 */
2113 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2114                             TDB_DATA data, void *private_data)
2115 {
2116         int ret;
2117         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2118         struct ctdb_node_map *nodemap=NULL;
2119         TALLOC_CTX *tmp_ctx;
2120         uint32_t changed_flags;
2121         int i;
2122         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2123         int disabled_flag_changed;
2124
2125         if (data.dsize != sizeof(*c)) {
2126                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2127                 return;
2128         }
2129
2130         tmp_ctx = talloc_new(ctdb);
2131         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2132
2133         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2134         if (ret != 0) {
2135                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2136                 talloc_free(tmp_ctx);
2137                 return;         
2138         }
2139
2140
2141         for (i=0;i<nodemap->num;i++) {
2142                 if (nodemap->nodes[i].pnn == c->pnn) break;
2143         }
2144
2145         if (i == nodemap->num) {
2146                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2147                 talloc_free(tmp_ctx);
2148                 return;
2149         }
2150
2151         changed_flags = c->old_flags ^ c->new_flags;
2152
2153         if (nodemap->nodes[i].flags != c->new_flags) {
2154                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2155         }
2156
2157         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2158
2159         nodemap->nodes[i].flags = c->new_flags;
2160
2161         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2162                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2163
2164         if (ret == 0) {
2165                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2166                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2167         }
2168         
2169         if (ret == 0 &&
2170             ctdb->recovery_master == ctdb->pnn &&
2171             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2172                 /* Only do the takeover run if the perm disabled or unhealthy
2173                    flags changed since these will cause an ip failover but not
2174                    a recovery.
2175                    If the node became disconnected or banned this will also
2176                    lead to an ip address failover but that is handled 
2177                    during recovery
2178                 */
2179                 if (disabled_flag_changed) {
2180                         rec->need_takeover_run = true;
2181                 }
2182         }
2183
2184         talloc_free(tmp_ctx);
2185 }
2186
2187 /*
2188   handler for when we need to push out flag changes ot all other nodes
2189 */
2190 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2191                             TDB_DATA data, void *private_data)
2192 {
2193         int ret;
2194         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2195         struct ctdb_node_map *nodemap=NULL;
2196         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2197         uint32_t recmaster;
2198         uint32_t *nodes;
2199
2200         /* find the recovery master */
2201         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2202         if (ret != 0) {
2203                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2204                 talloc_free(tmp_ctx);
2205                 return;
2206         }
2207
2208         /* read the node flags from the recmaster */
2209         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2210         if (ret != 0) {
2211                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2212                 talloc_free(tmp_ctx);
2213                 return;
2214         }
2215         if (c->pnn >= nodemap->num) {
2216                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2217                 talloc_free(tmp_ctx);
2218                 return;
2219         }
2220
2221         /* send the flags update to all connected nodes */
2222         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2223
2224         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2225                                       nodes, 0, CONTROL_TIMEOUT(),
2226                                       false, data,
2227                                       NULL, NULL,
2228                                       NULL) != 0) {
2229                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2230
2231                 talloc_free(tmp_ctx);
2232                 return;
2233         }
2234
2235         talloc_free(tmp_ctx);
2236 }
2237
2238
2239 struct verify_recmode_normal_data {
2240         uint32_t count;
2241         enum monitor_result status;
2242 };
2243
2244 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2245 {
2246         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2247
2248
2249         /* one more node has responded with recmode data*/
2250         rmdata->count--;
2251
2252         /* if we failed to get the recmode, then return an error and let
2253            the main loop try again.
2254         */
2255         if (state->state != CTDB_CONTROL_DONE) {
2256                 if (rmdata->status == MONITOR_OK) {
2257                         rmdata->status = MONITOR_FAILED;
2258                 }
2259                 return;
2260         }
2261
2262         /* if we got a response, then the recmode will be stored in the
2263            status field
2264         */
2265         if (state->status != CTDB_RECOVERY_NORMAL) {
2266                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2267                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2268         }
2269
2270         return;
2271 }
2272
2273
2274 /* verify that all nodes are in normal recovery mode */
2275 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2276 {
2277         struct verify_recmode_normal_data *rmdata;
2278         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2279         struct ctdb_client_control_state *state;
2280         enum monitor_result status;
2281         int j;
2282         
2283         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2284         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2285         rmdata->count  = 0;
2286         rmdata->status = MONITOR_OK;
2287
2288         /* loop over all active nodes and send an async getrecmode call to 
2289            them*/
2290         for (j=0; j<nodemap->num; j++) {
2291                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2292                         continue;
2293                 }
2294                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2295                                         CONTROL_TIMEOUT(), 
2296                                         nodemap->nodes[j].pnn);
2297                 if (state == NULL) {
2298                         /* we failed to send the control, treat this as 
2299                            an error and try again next iteration
2300                         */                      
2301                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2302                         talloc_free(mem_ctx);
2303                         return MONITOR_FAILED;
2304                 }
2305
2306                 /* set up the callback functions */
2307                 state->async.fn = verify_recmode_normal_callback;
2308                 state->async.private_data = rmdata;
2309
2310                 /* one more control to wait for to complete */
2311                 rmdata->count++;
2312         }
2313
2314
2315         /* now wait for up to the maximum number of seconds allowed
2316            or until all nodes we expect a response from has replied
2317         */
2318         while (rmdata->count > 0) {
2319                 event_loop_once(ctdb->ev);
2320         }
2321
2322         status = rmdata->status;
2323         talloc_free(mem_ctx);
2324         return status;
2325 }
2326
2327
2328 struct verify_recmaster_data {
2329         struct ctdb_recoverd *rec;
2330         uint32_t count;
2331         uint32_t pnn;
2332         enum monitor_result status;
2333 };
2334
2335 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2336 {
2337         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2338
2339
2340         /* one more node has responded with recmaster data*/
2341         rmdata->count--;
2342
2343         /* if we failed to get the recmaster, then return an error and let
2344            the main loop try again.
2345         */
2346         if (state->state != CTDB_CONTROL_DONE) {
2347                 if (rmdata->status == MONITOR_OK) {
2348                         rmdata->status = MONITOR_FAILED;
2349                 }
2350                 return;
2351         }
2352
2353         /* if we got a response, then the recmaster will be stored in the
2354            status field
2355         */
2356         if (state->status != rmdata->pnn) {
2357                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2358                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2359                 rmdata->status = MONITOR_ELECTION_NEEDED;
2360         }
2361
2362         return;
2363 }
2364
2365
2366 /* verify that all nodes agree that we are the recmaster */
2367 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2368 {
2369         struct ctdb_context *ctdb = rec->ctdb;
2370         struct verify_recmaster_data *rmdata;
2371         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2372         struct ctdb_client_control_state *state;
2373         enum monitor_result status;
2374         int j;
2375         
2376         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2377         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2378         rmdata->rec    = rec;
2379         rmdata->count  = 0;
2380         rmdata->pnn    = pnn;
2381         rmdata->status = MONITOR_OK;
2382
2383         /* loop over all active nodes and send an async getrecmaster call to 
2384            them*/
2385         for (j=0; j<nodemap->num; j++) {
2386                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2387                         continue;
2388                 }
2389                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2390                                         CONTROL_TIMEOUT(),
2391                                         nodemap->nodes[j].pnn);
2392                 if (state == NULL) {
2393                         /* we failed to send the control, treat this as 
2394                            an error and try again next iteration
2395                         */                      
2396                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2397                         talloc_free(mem_ctx);
2398                         return MONITOR_FAILED;
2399                 }
2400
2401                 /* set up the callback functions */
2402                 state->async.fn = verify_recmaster_callback;
2403                 state->async.private_data = rmdata;
2404
2405                 /* one more control to wait for to complete */
2406                 rmdata->count++;
2407         }
2408
2409
2410         /* now wait for up to the maximum number of seconds allowed
2411            or until all nodes we expect a response from has replied
2412         */
2413         while (rmdata->count > 0) {
2414                 event_loop_once(ctdb->ev);
2415         }
2416
2417         status = rmdata->status;
2418         talloc_free(mem_ctx);
2419         return status;
2420 }
2421
2422
2423 /* called to check that the local allocation of public ip addresses is ok.
2424 */
2425 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2426 {
2427         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2428         struct ctdb_control_get_ifaces *ifaces = NULL;
2429         struct ctdb_all_public_ips *ips = NULL;
2430         struct ctdb_uptime *uptime1 = NULL;
2431         struct ctdb_uptime *uptime2 = NULL;
2432         int ret, j;
2433         bool need_iface_check = false;
2434         bool need_takeover_run = false;
2435
2436         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2437                                 CTDB_CURRENT_NODE, &uptime1);
2438         if (ret != 0) {
2439                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2440                 talloc_free(mem_ctx);
2441                 return -1;
2442         }
2443
2444
2445         /* read the interfaces from the local node */
2446         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2447         if (ret != 0) {
2448                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2449                 talloc_free(mem_ctx);
2450                 return -1;
2451         }
2452
2453         if (!rec->ifaces) {
2454                 need_iface_check = true;
2455         } else if (rec->ifaces->num != ifaces->num) {
2456                 need_iface_check = true;
2457         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2458                 need_iface_check = true;
2459         }
2460
2461         if (need_iface_check) {
2462                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2463                                      "local node %u - force takeover run\n",
2464                                      pnn));
2465                 need_takeover_run = true;
2466         }
2467
2468         /* read the ip allocation from the local node */
2469         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2470         if (ret != 0) {
2471                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2472                 talloc_free(mem_ctx);
2473                 return -1;
2474         }
2475
2476         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2477                                 CTDB_CURRENT_NODE, &uptime2);
2478         if (ret != 0) {
2479                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2480                 talloc_free(mem_ctx);
2481                 return -1;
2482         }
2483
2484         /* skip the check if the startrecovery time has changed */
2485         if (timeval_compare(&uptime1->last_recovery_started,
2486                             &uptime2->last_recovery_started) != 0) {
2487                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2488                 talloc_free(mem_ctx);
2489                 return 0;
2490         }
2491
2492         /* skip the check if the endrecovery time has changed */
2493         if (timeval_compare(&uptime1->last_recovery_finished,
2494                             &uptime2->last_recovery_finished) != 0) {
2495                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2496                 talloc_free(mem_ctx);
2497                 return 0;
2498         }
2499
2500         /* skip the check if we have started but not finished recovery */
2501         if (timeval_compare(&uptime1->last_recovery_finished,
2502                             &uptime1->last_recovery_started) != 1) {
2503                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2504                 talloc_free(mem_ctx);
2505
2506                 return 0;
2507         }
2508
2509         talloc_free(rec->ifaces);
2510         rec->ifaces = talloc_steal(rec, ifaces);
2511
2512         /* verify that we have the ip addresses we should have
2513            and we dont have ones we shouldnt have.
2514            if we find an inconsistency we set recmode to
2515            active on the local node and wait for the recmaster
2516            to do a full blown recovery
2517         */
2518         for (j=0; j<ips->num; j++) {
2519                 if (ips->ips[j].pnn == pnn) {
2520                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2521                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2522                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2523                                 need_takeover_run = true;
2524                         }
2525                 } else {
2526                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2527                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2528                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2529                                 need_takeover_run = true;
2530                         }
2531                 }
2532         }
2533
2534         if (need_takeover_run) {
2535                 struct takeover_run_reply rd;
2536                 TDB_DATA data;
2537
2538                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2539
2540                 rd.pnn = ctdb->pnn;
2541                 rd.srvid = 0;
2542                 data.dptr = (uint8_t *)&rd;
2543                 data.dsize = sizeof(rd);
2544
2545                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2546                 if (ret != 0) {
2547                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2548                 }
2549         }
2550         talloc_free(mem_ctx);
2551         return 0;
2552 }
2553
2554
2555 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2556 {
2557         struct ctdb_node_map **remote_nodemaps = callback_data;
2558
2559         if (node_pnn >= ctdb->num_nodes) {
2560                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2561                 return;
2562         }
2563
2564         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2565
2566 }
2567
2568 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2569         struct ctdb_node_map *nodemap,
2570         struct ctdb_node_map **remote_nodemaps)
2571 {
2572         uint32_t *nodes;
2573
2574         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2575         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2576                                         nodes, 0,
2577                                         CONTROL_TIMEOUT(), false, tdb_null,
2578                                         async_getnodemap_callback,
2579                                         NULL,
2580                                         remote_nodemaps) != 0) {
2581                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2582
2583                 return -1;
2584         }
2585
2586         return 0;
2587 }
2588
2589 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2590 struct ctdb_check_reclock_state {
2591         struct ctdb_context *ctdb;
2592         struct timeval start_time;
2593         int fd[2];
2594         pid_t child;
2595         struct timed_event *te;
2596         struct fd_event *fde;
2597         enum reclock_child_status status;
2598 };
2599
2600 /* when we free the reclock state we must kill any child process.
2601 */
2602 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2603 {
2604         struct ctdb_context *ctdb = state->ctdb;
2605
2606         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2607
2608         if (state->fd[0] != -1) {
2609                 close(state->fd[0]);
2610                 state->fd[0] = -1;
2611         }
2612         if (state->fd[1] != -1) {
2613                 close(state->fd[1]);
2614                 state->fd[1] = -1;
2615         }
2616         kill(state->child, SIGKILL);
2617         return 0;
2618 }
2619
2620 /*
2621   called if our check_reclock child times out. this would happen if
2622   i/o to the reclock file blocks.
2623  */
2624 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2625                                          struct timeval t, void *private_data)
2626 {
2627         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2628                                            struct ctdb_check_reclock_state);
2629
2630         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2631         state->status = RECLOCK_TIMEOUT;
2632 }
2633
2634 /* this is called when the child process has completed checking the reclock
2635    file and has written data back to us through the pipe.
2636 */
2637 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2638                              uint16_t flags, void *private_data)
2639 {
2640         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2641                                              struct ctdb_check_reclock_state);
2642         char c = 0;
2643         int ret;
2644
2645         /* we got a response from our child process so we can abort the
2646            timeout.
2647         */
2648         talloc_free(state->te);
2649         state->te = NULL;
2650
2651         ret = read(state->fd[0], &c, 1);
2652         if (ret != 1 || c != RECLOCK_OK) {
2653                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2654                 state->status = RECLOCK_FAILED;
2655
2656                 return;
2657         }
2658
2659         state->status = RECLOCK_OK;
2660         return;
2661 }
2662
2663 static int check_recovery_lock(struct ctdb_context *ctdb)
2664 {
2665         int ret;
2666         struct ctdb_check_reclock_state *state;
2667         pid_t parent = getpid();
2668
2669         if (ctdb->recovery_lock_fd == -1) {
2670                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2671                 return -1;
2672         }
2673
2674         state = talloc(ctdb, struct ctdb_check_reclock_state);
2675         CTDB_NO_MEMORY(ctdb, state);
2676
2677         state->ctdb = ctdb;
2678         state->start_time = timeval_current();
2679         state->status = RECLOCK_CHECKING;
2680         state->fd[0] = -1;
2681         state->fd[1] = -1;
2682
2683         ret = pipe(state->fd);
2684         if (ret != 0) {
2685                 talloc_free(state);
2686                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2687                 return -1;
2688         }
2689
2690         state->child = fork();
2691         if (state->child == (pid_t)-1) {
2692                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2693                 close(state->fd[0]);
2694                 state->fd[0] = -1;
2695                 close(state->fd[1]);
2696                 state->fd[1] = -1;
2697                 talloc_free(state);
2698                 return -1;
2699         }
2700
2701         if (state->child == 0) {
2702                 char cc = RECLOCK_OK;
2703                 close(state->fd[0]);
2704                 state->fd[0] = -1;
2705
2706                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2707                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2708                         cc = RECLOCK_FAILED;
2709                 }
2710
2711                 write(state->fd[1], &cc, 1);
2712                 /* make sure we die when our parent dies */
2713                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2714                         sleep(5);
2715                         write(state->fd[1], &cc, 1);
2716                 }
2717                 _exit(0);
2718         }
2719         close(state->fd[1]);
2720         state->fd[1] = -1;
2721         set_close_on_exec(state->fd[0]);
2722
2723         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2724
2725         talloc_set_destructor(state, check_reclock_destructor);
2726
2727         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2728                                     ctdb_check_reclock_timeout, state);
2729         if (state->te == NULL) {
2730                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2731                 talloc_free(state);
2732                 return -1;
2733         }
2734
2735         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2736                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2737                                 reclock_child_handler,
2738                                 (void *)state);
2739
2740         if (state->fde == NULL) {
2741                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2742                 talloc_free(state);
2743                 return -1;
2744         }
2745
2746         while (state->status == RECLOCK_CHECKING) {
2747                 event_loop_once(ctdb->ev);
2748         }
2749
2750         if (state->status == RECLOCK_FAILED) {
2751                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2752                 close(ctdb->recovery_lock_fd);
2753                 ctdb->recovery_lock_fd = -1;
2754                 talloc_free(state);
2755                 return -1;
2756         }
2757
2758         talloc_free(state);
2759         return 0;
2760 }
2761
2762 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2763 {
2764         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2765         const char *reclockfile;
2766
2767         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2768                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2769                 talloc_free(tmp_ctx);
2770                 return -1;      
2771         }
2772
2773         if (reclockfile == NULL) {
2774                 if (ctdb->recovery_lock_file != NULL) {
2775                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2776                         talloc_free(ctdb->recovery_lock_file);
2777                         ctdb->recovery_lock_file = NULL;
2778                         if (ctdb->recovery_lock_fd != -1) {
2779                                 close(ctdb->recovery_lock_fd);
2780                                 ctdb->recovery_lock_fd = -1;
2781                         }
2782                 }
2783                 ctdb->tunable.verify_recovery_lock = 0;
2784                 talloc_free(tmp_ctx);
2785                 return 0;
2786         }
2787
2788         if (ctdb->recovery_lock_file == NULL) {
2789                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2790                 if (ctdb->recovery_lock_fd != -1) {
2791                         close(ctdb->recovery_lock_fd);
2792                         ctdb->recovery_lock_fd = -1;
2793                 }
2794                 talloc_free(tmp_ctx);
2795                 return 0;
2796         }
2797
2798
2799         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2800                 talloc_free(tmp_ctx);
2801                 return 0;
2802         }
2803
2804         talloc_free(ctdb->recovery_lock_file);
2805         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2806         ctdb->tunable.verify_recovery_lock = 0;
2807         if (ctdb->recovery_lock_fd != -1) {
2808                 close(ctdb->recovery_lock_fd);
2809                 ctdb->recovery_lock_fd = -1;
2810         }
2811
2812         talloc_free(tmp_ctx);
2813         return 0;
2814 }
2815                 
2816 /*
2817   the main monitoring loop
2818  */
2819 static void monitor_cluster(struct ctdb_context *ctdb)
2820 {
2821         uint32_t pnn;
2822         TALLOC_CTX *mem_ctx=NULL;
2823         struct ctdb_node_map *nodemap=NULL;
2824         struct ctdb_node_map *recmaster_nodemap=NULL;
2825         struct ctdb_node_map **remote_nodemaps=NULL;
2826         struct ctdb_vnn_map *vnnmap=NULL;
2827         struct ctdb_vnn_map *remote_vnnmap=NULL;
2828         int32_t debug_level;
2829         int i, j, ret;
2830         struct ctdb_recoverd *rec;
2831
2832         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2833
2834         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2835         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2836
2837         rec->ctdb = ctdb;
2838
2839         rec->priority_time = timeval_current();
2840
2841         /* register a message port for sending memory dumps */
2842         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2843
2844         /* register a message port for recovery elections */
2845         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2846
2847         /* when nodes are disabled/enabled */
2848         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2849
2850         /* when we are asked to puch out a flag change */
2851         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2852
2853         /* register a message port for vacuum fetch */
2854         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2855
2856         /* register a message port for reloadnodes  */
2857         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2858
2859         /* register a message port for performing a takeover run */
2860         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2861
2862         /* register a message port for disabling the ip check for a short while */
2863         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2864
2865 again:
2866         if (mem_ctx) {
2867                 talloc_free(mem_ctx);
2868                 mem_ctx = NULL;
2869         }
2870         mem_ctx = talloc_new(ctdb);
2871         if (!mem_ctx) {
2872                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2873                 exit(-1);
2874         }
2875
2876         /* we only check for recovery once every second */
2877         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2878
2879         /* verify that the main daemon is still running */
2880         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2881                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2882                 exit(-1);
2883         }
2884
2885         /* ping the local daemon to tell it we are alive */
2886         ctdb_ctrl_recd_ping(ctdb);
2887
2888         if (rec->election_timeout) {
2889                 /* an election is in progress */
2890                 goto again;
2891         }
2892
2893         /* read the debug level from the parent and update locally */
2894         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2895         if (ret !=0) {
2896                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2897                 goto again;
2898         }
2899         LogLevel = debug_level;
2900
2901
2902         /* We must check if we need to ban a node here but we want to do this
2903            as early as possible so we dont wait until we have pulled the node
2904            map from the local node. thats why we have the hardcoded value 20
2905         */
2906         for (i=0; i<ctdb->num_nodes; i++) {
2907                 struct ctdb_banning_state *ban_state;
2908
2909                 if (ctdb->nodes[i]->ban_state == NULL) {
2910                         continue;
2911                 }
2912                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2913                 if (ban_state->count < 20) {
2914                         continue;
2915                 }
2916                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2917                         ctdb->nodes[i]->pnn, ban_state->count,
2918                         ctdb->tunable.recovery_ban_period));
2919                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2920                 ban_state->count = 0;
2921         }
2922
2923         /* get relevant tunables */
2924         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2925         if (ret != 0) {
2926                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2927                 goto again;
2928         }
2929
2930         /* get the current recovery lock file from the server */
2931         if (update_recovery_lock_file(ctdb) != 0) {
2932                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2933                 goto again;
2934         }
2935
2936         /* Make sure that if recovery lock verification becomes disabled when
2937            we close the file
2938         */
2939         if (ctdb->tunable.verify_recovery_lock == 0) {
2940                 if (ctdb->recovery_lock_fd != -1) {
2941                         close(ctdb->recovery_lock_fd);
2942                         ctdb->recovery_lock_fd = -1;
2943                 }
2944         }
2945
2946         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2947         if (pnn == (uint32_t)-1) {
2948                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2949                 goto again;
2950         }
2951
2952         /* get the vnnmap */
2953         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2954         if (ret != 0) {
2955                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2956                 goto again;
2957         }
2958
2959
2960         /* get number of nodes */
2961         if (rec->nodemap) {
2962                 talloc_free(rec->nodemap);
2963                 rec->nodemap = NULL;
2964                 nodemap=NULL;
2965         }
2966         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2967         if (ret != 0) {
2968                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2969                 goto again;
2970         }
2971         nodemap = rec->nodemap;
2972
2973         /* check which node is the recovery master */
2974         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2975         if (ret != 0) {
2976                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2977                 goto again;
2978         }
2979
2980         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2981         if (rec->recmaster != pnn) {
2982                 if (rec->ip_reallocate_ctx != NULL) {
2983                         talloc_free(rec->ip_reallocate_ctx);
2984                         rec->ip_reallocate_ctx = NULL;
2985                         rec->reallocate_callers = NULL;
2986                 }
2987         }
2988         /* if there are takeovers requested, perform it and notify the waiters */
2989         if (rec->reallocate_callers) {
2990                 process_ipreallocate_requests(ctdb, rec);
2991         }
2992
2993         if (rec->recmaster == (uint32_t)-1) {
2994                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2995                 force_election(rec, pnn, nodemap);
2996                 goto again;
2997         }
2998
2999
3000         /* if the local daemon is STOPPED, we verify that the databases are
3001            also frozen and thet the recmode is set to active 
3002         */
3003         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3004                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3005                 if (ret != 0) {
3006                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3007                 }
3008                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3009                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3010
3011                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3012                         if (ret != 0) {
3013                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3014                                 goto again;
3015                         }
3016                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3017                         if (ret != 0) {
3018                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3019
3020                                 goto again;
3021                         }
3022                         goto again;
3023                 }
3024         }
3025         /* If the local node is stopped, verify we are not the recmaster 
3026            and yield this role if so
3027         */
3028         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3029                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3030                 force_election(rec, pnn, nodemap);
3031                 goto again;
3032         }
3033         
3034         /* check that we (recovery daemon) and the local ctdb daemon
3035            agrees on whether we are banned or not
3036         */
3037 //qqq
3038
3039         /* remember our own node flags */
3040         rec->node_flags = nodemap->nodes[pnn].flags;
3041
3042         /* count how many active nodes there are */
3043         rec->num_active    = 0;
3044         rec->num_connected = 0;
3045         for (i=0; i<nodemap->num; i++) {
3046                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3047                         rec->num_active++;
3048                 }
3049                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3050                         rec->num_connected++;
3051                 }
3052         }
3053
3054
3055         /* verify that the recmaster node is still active */
3056         for (j=0; j<nodemap->num; j++) {
3057                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3058                         break;
3059                 }
3060         }
3061
3062         if (j == nodemap->num) {
3063                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3064                 force_election(rec, pnn, nodemap);
3065                 goto again;
3066         }
3067
3068         /* if recovery master is disconnected we must elect a new recmaster */
3069         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3070                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3071                 force_election(rec, pnn, nodemap);
3072                 goto again;
3073         }
3074
3075         /* grap the nodemap from the recovery master to check if it is banned */
3076         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3077                                    mem_ctx, &recmaster_nodemap);
3078         if (ret != 0) {
3079                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3080                           nodemap->nodes[j].pnn));
3081                 goto again;
3082         }
3083
3084
3085         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3086                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3087                 force_election(rec, pnn, nodemap);
3088                 goto again;
3089         }
3090
3091
3092         /* verify that we have all ip addresses we should have and we dont
3093          * have addresses we shouldnt have.
3094          */ 
3095         if (ctdb->do_checkpublicip) {
3096                 if (rec->ip_check_disable_ctx == NULL) {
3097                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3098                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3099                         }
3100                 }
3101         }
3102
3103
3104         /* if we are not the recmaster then we do not need to check
3105            if recovery is needed
3106          */
3107         if (pnn != rec->recmaster) {
3108                 goto again;
3109         }
3110
3111
3112         /* ensure our local copies of flags are right */
3113         ret = update_local_flags(rec, nodemap);
3114         if (ret == MONITOR_ELECTION_NEEDED) {
3115                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3116                 force_election(rec, pnn, nodemap);
3117                 goto again;
3118         }
3119         if (ret != MONITOR_OK) {
3120                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3121                 goto again;
3122         }
3123
3124         if (ctdb->num_nodes != nodemap->num) {
3125                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3126                 reload_nodes_file(ctdb);
3127                 goto again;
3128         }
3129
3130         /* verify that all active nodes agree that we are the recmaster */
3131         switch (verify_recmaster(rec, nodemap, pnn)) {
3132         case MONITOR_RECOVERY_NEEDED:
3133                 /* can not happen */
3134                 goto again;
3135         case MONITOR_ELECTION_NEEDED:
3136                 force_election(rec, pnn, nodemap);
3137                 goto again;
3138         case MONITOR_OK:
3139                 break;
3140         case MONITOR_FAILED:
3141                 goto again;
3142         }
3143
3144
3145         if (rec->need_recovery) {
3146                 /* a previous recovery didn't finish */
3147                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3148                 goto again;             
3149         }
3150
3151         /* verify that all active nodes are in normal mode 
3152            and not in recovery mode 
3153         */
3154         switch (verify_recmode(ctdb, nodemap)) {
3155         case MONITOR_RECOVERY_NEEDED:
3156                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3157                 goto again;
3158         case MONITOR_FAILED:
3159                 goto again;
3160         case MONITOR_ELECTION_NEEDED:
3161                 /* can not happen */
3162         case MONITOR_OK:
3163                 break;
3164         }
3165
3166
3167         if (ctdb->tunable.verify_recovery_lock != 0) {
3168                 /* we should have the reclock - check its not stale */
3169                 ret = check_recovery_lock(ctdb);
3170                 if (ret != 0) {
3171                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3172                         ctdb_set_culprit(rec, ctdb->pnn);
3173                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3174                         goto again;
3175                 }
3176         }
3177
3178         /* get the nodemap for all active remote nodes
3179          */
3180         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3181         if (remote_nodemaps == NULL) {
3182                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3183                 goto again;
3184         }
3185         for(i=0; i<nodemap->num; i++) {
3186                 remote_nodemaps[i] = NULL;
3187         }
3188         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3189                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3190                 goto again;
3191         } 
3192
3193         /* verify that all other nodes have the same nodemap as we have
3194         */
3195         for (j=0; j<nodemap->num; j++) {
3196                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3197                         continue;
3198                 }
3199
3200                 if (remote_nodemaps[j] == NULL) {
3201                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3202                         ctdb_set_culprit(rec, j);
3203
3204                         goto again;
3205                 }
3206
3207                 /* if the nodes disagree on how many nodes there are
3208                    then this is a good reason to try recovery
3209                  */
3210                 if (remote_nodemaps[j]->num != nodemap->num) {
3211                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3212                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3213                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3214                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3215                         goto again;
3216                 }
3217
3218                 /* if the nodes disagree on which nodes exist and are
3219                    active, then that is also a good reason to do recovery
3220                  */
3221                 for (i=0;i<nodemap->num;i++) {
3222                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3223                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3224                                           nodemap->nodes[j].pnn, i, 
3225                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3226                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3227                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3228                                             vnnmap);
3229                                 goto again;
3230                         }
3231                 }
3232
3233                 /* verify the flags are consistent
3234                 */
3235                 for (i=0; i<nodemap->num; i++) {
3236                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3237                                 continue;
3238                         }
3239                         
3240                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3241                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3242                                   nodemap->nodes[j].pnn, 
3243                                   nodemap->nodes[i].pnn, 
3244                                   remote_nodemaps[j]->nodes[i].flags,
3245                                   nodemap->nodes[j].flags));
3246                                 if (i == j) {
3247                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3248                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3249                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3250                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3251                                                     vnnmap);
3252                                         goto again;
3253                                 } else {
3254                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3255                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3256                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3257                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3258                                                     vnnmap);
3259                                         goto again;
3260                                 }
3261                         }
3262                 }
3263         }
3264
3265
3266         /* there better be the same number of lmasters in the vnn map
3267            as there are active nodes or we will have to do a recovery
3268          */
3269         if (vnnmap->size != rec->num_active) {
3270                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3271                           vnnmap->size, rec->num_active));
3272                 ctdb_set_culprit(rec, ctdb->pnn);
3273                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3274                 goto again;
3275         }
3276
3277         /* verify that all active nodes in the nodemap also exist in 
3278            the vnnmap.
3279          */
3280         for (j=0; j<nodemap->num; j++) {
3281                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3282                         continue;
3283                 }
3284                 if (nodemap->nodes[j].pnn == pnn) {
3285                         continue;
3286                 }
3287
3288                 for (i=0; i<vnnmap->size; i++) {
3289                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3290                                 break;
3291                         }
3292                 }
3293                 if (i == vnnmap->size) {
3294                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3295                                   nodemap->nodes[j].pnn));
3296                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3297                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3298                         goto again;
3299                 }
3300         }
3301
3302         
3303         /* verify that all other nodes have the same vnnmap
3304            and are from the same generation
3305          */
3306         for (j=0; j<nodemap->num; j++) {
3307                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3308                         continue;
3309                 }
3310                 if (nodemap->nodes[j].pnn == pnn) {
3311                         continue;
3312                 }
3313
3314                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3315                                           mem_ctx, &remote_vnnmap);
3316                 if (ret != 0) {
3317                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3318                                   nodemap->nodes[j].pnn));
3319                         goto again;
3320                 }
3321
3322                 /* verify the vnnmap generation is the same */
3323                 if (vnnmap->generation != remote_vnnmap->generation) {
3324                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3325                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3326                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3327                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3328                         goto again;
3329                 }
3330
3331                 /* verify the vnnmap size is the same */
3332                 if (vnnmap->size != remote_vnnmap->size) {
3333                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3334                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3335                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3336                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3337                         goto again;
3338                 }
3339
3340                 /* verify the vnnmap is the same */
3341                 for (i=0;i<vnnmap->size;i++) {
3342                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3343                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3344                                           nodemap->nodes[j].pnn));
3345                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3346                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3347                                             vnnmap);
3348                                 goto again;
3349                         }
3350                 }
3351         }
3352
3353         /* we might need to change who has what IP assigned */
3354         if (rec->need_takeover_run) {
3355                 uint32_t culprit = (uint32_t)-1;
3356
3357                 rec->need_takeover_run = false;
3358
3359                 /* update the list of public ips that a node can handle for
3360                    all connected nodes
3361                 */
3362                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3363                 if (ret != 0) {
3364                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3365                                          culprit));
3366                         ctdb_set_culprit(rec, culprit);
3367                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3368                         goto again;
3369                 }
3370
3371                 /* execute the "startrecovery" event script on all nodes */
3372                 ret = run_startrecovery_eventscript(rec, nodemap);
3373                 if (ret!=0) {
3374                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3375                         ctdb_set_culprit(rec, ctdb->pnn);
3376                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3377                         goto again;
3378                 }
3379
3380                 ret = ctdb_takeover_run(ctdb, nodemap);
3381                 if (ret != 0) {
3382                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3383                         ctdb_set_culprit(rec, ctdb->pnn);
3384                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3385                         goto again;
3386                 }
3387
3388                 /* execute the "recovered" event script on all nodes */
3389                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3390 #if 0
3391 // we cant check whether the event completed successfully
3392 // since this script WILL fail if the node is in recovery mode
3393 // and if that race happens, the code here would just cause a second
3394 // cascading recovery.
3395                 if (ret!=0) {
3396                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3397                         ctdb_set_culprit(rec, ctdb->pnn);
3398                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3399                 }
3400 #endif
3401         }
3402
3403
3404         goto again;
3405
3406 }
3407
3408 /*
3409   event handler for when the main ctdbd dies
3410  */
3411 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3412                                  uint16_t flags, void *private_data)
3413 {
3414         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3415         _exit(1);
3416 }
3417
3418 /*
3419   called regularly to verify that the recovery daemon is still running
3420  */
3421 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3422                               struct timeval yt, void *p)
3423 {
3424         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3425
3426         if (kill(ctdb->recoverd_pid, 0) != 0) {
3427                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3428
3429                 ctdb_stop_recoverd(ctdb);
3430                 ctdb_stop_keepalive(ctdb);
3431                 ctdb_stop_monitoring(ctdb);
3432                 ctdb_release_all_ips(ctdb);
3433                 if (ctdb->methods != NULL) {
3434                         ctdb->methods->shutdown(ctdb);
3435                 }
3436                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3437
3438                 exit(10);       
3439         }
3440
3441         event_add_timed(ctdb->ev, ctdb, 
3442                         timeval_current_ofs(30, 0),
3443                         ctdb_check_recd, ctdb);
3444 }
3445
3446 static void recd_sig_child_handler(struct event_context *ev,
3447         struct signal_event *se, int signum, int count,
3448         void *dont_care, 
3449         void *private_data)
3450 {
3451 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3452         int status;
3453         pid_t pid = -1;
3454
3455         while (pid != 0) {
3456                 pid = waitpid(-1, &status, WNOHANG);
3457                 if (pid == -1) {
3458                         if (errno != ECHILD) {
3459                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3460                         }
3461                         return;
3462                 }
3463                 if (pid > 0) {
3464                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3465                 }
3466         }
3467 }
3468
3469 /*
3470   startup the recovery daemon as a child of the main ctdb daemon
3471  */
3472 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3473 {
3474         int fd[2];
3475         struct signal_event *se;
3476
3477         if (pipe(fd) != 0) {
3478                 return -1;
3479         }
3480
3481         ctdb->ctdbd_pid = getpid();
3482
3483         ctdb->recoverd_pid = fork();
3484         if (ctdb->recoverd_pid == -1) {
3485                 return -1;
3486         }
3487         
3488         if (ctdb->recoverd_pid != 0) {
3489                 close(fd[0]);
3490                 event_add_timed(ctdb->ev, ctdb, 
3491                                 timeval_current_ofs(30, 0),
3492                                 ctdb_check_recd, ctdb);
3493                 return 0;
3494         }
3495
3496         close(fd[1]);
3497
3498         srandom(getpid() ^ time(NULL));
3499
3500         if (switch_from_server_to_client(ctdb) != 0) {
3501                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3502                 exit(1);
3503         }
3504
3505         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3506
3507         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3508                      ctdb_recoverd_parent, &fd[0]);     
3509
3510         /* set up a handler to pick up sigchld */
3511         se = event_add_signal(ctdb->ev, ctdb,
3512                                      SIGCHLD, 0,
3513                                      recd_sig_child_handler,
3514                                      ctdb);
3515         if (se == NULL) {
3516                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3517                 exit(1);
3518         }
3519
3520         monitor_cluster(ctdb);
3521
3522         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3523         return -1;
3524 }
3525
3526 /*
3527   shutdown the recovery daemon
3528  */
3529 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3530 {
3531         if (ctdb->recoverd_pid == 0) {
3532                 return;
3533         }
3534
3535         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3536         kill(ctdb->recoverd_pid, SIGTERM);
3537 }