3ee607e5e19a4f00f9ba988d327930b37b6175b3
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1231                                          struct ctdb_recoverd *rec,
1232                                          struct ctdb_node_map *nodemap,
1233                                          uint32_t *culprit)
1234 {
1235         int j;
1236         int ret;
1237
1238         if (ctdb->num_nodes != nodemap->num) {
1239                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1240                                   ctdb->num_nodes, nodemap->num));
1241                 if (culprit) {
1242                         *culprit = ctdb->pnn;
1243                 }
1244                 return -1;
1245         }
1246
1247         for (j=0; j<nodemap->num; j++) {
1248                 /* release any existing data */
1249                 if (ctdb->nodes[j]->known_public_ips) {
1250                         talloc_free(ctdb->nodes[j]->known_public_ips);
1251                         ctdb->nodes[j]->known_public_ips = NULL;
1252                 }
1253                 if (ctdb->nodes[j]->available_public_ips) {
1254                         talloc_free(ctdb->nodes[j]->available_public_ips);
1255                         ctdb->nodes[j]->available_public_ips = NULL;
1256                 }
1257
1258                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1259                         continue;
1260                 }
1261
1262                 /* grab a new shiny list of public ips from the node */
1263                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1264                                         CONTROL_TIMEOUT(),
1265                                         ctdb->nodes[j]->pnn,
1266                                         ctdb->nodes,
1267                                         0,
1268                                         &ctdb->nodes[j]->known_public_ips);
1269                 if (ret != 0) {
1270                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1271                                 ctdb->nodes[j]->pnn));
1272                         if (culprit) {
1273                                 *culprit = ctdb->nodes[j]->pnn;
1274                         }
1275                         return -1;
1276                 }
1277
1278                 if (rec->ip_check_disable_ctx == NULL) {
1279                         if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1280                                 DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1281                                 rec->need_takeover_run = true;
1282                         }
1283                 }
1284
1285                 /* grab a new shiny list of public ips from the node */
1286                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1287                                         CONTROL_TIMEOUT(),
1288                                         ctdb->nodes[j]->pnn,
1289                                         ctdb->nodes,
1290                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1291                                         &ctdb->nodes[j]->available_public_ips);
1292                 if (ret != 0) {
1293                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1294                                 ctdb->nodes[j]->pnn));
1295                         if (culprit) {
1296                                 *culprit = ctdb->nodes[j]->pnn;
1297                         }
1298                         return -1;
1299                 }
1300         }
1301
1302         return 0;
1303 }
1304
1305 /*
1306   we are the recmaster, and recovery is needed - start a recovery run
1307  */
1308 static int do_recovery(struct ctdb_recoverd *rec, 
1309                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1310                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1311 {
1312         struct ctdb_context *ctdb = rec->ctdb;
1313         int i, j, ret;
1314         uint32_t generation;
1315         struct ctdb_dbid_map *dbmap;
1316         TDB_DATA data;
1317         uint32_t *nodes;
1318         struct timeval start_time;
1319         uint32_t culprit = (uint32_t)-1;
1320
1321         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1322
1323         /* if recovery fails, force it again */
1324         rec->need_recovery = true;
1325
1326         for (i=0; i<ctdb->num_nodes; i++) {
1327                 struct ctdb_banning_state *ban_state;
1328
1329                 if (ctdb->nodes[i]->ban_state == NULL) {
1330                         continue;
1331                 }
1332                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1333                 if (ban_state->count < 2*ctdb->num_nodes) {
1334                         continue;
1335                 }
1336                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1337                         ctdb->nodes[i]->pnn, ban_state->count,
1338                         ctdb->tunable.recovery_ban_period));
1339                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1340                 ban_state->count = 0;
1341         }
1342
1343
1344         if (ctdb->tunable.verify_recovery_lock != 0) {
1345                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1346                 start_time = timeval_current();
1347                 if (!ctdb_recovery_lock(ctdb, true)) {
1348                         ctdb_set_culprit(rec, pnn);
1349                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1350                         return -1;
1351                 }
1352                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1353                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1354         }
1355
1356         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1357
1358         /* get a list of all databases */
1359         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1360         if (ret != 0) {
1361                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1362                 return -1;
1363         }
1364
1365         /* we do the db creation before we set the recovery mode, so the freeze happens
1366            on all databases we will be dealing with. */
1367
1368         /* verify that we have all the databases any other node has */
1369         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1370         if (ret != 0) {
1371                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1372                 return -1;
1373         }
1374
1375         /* verify that all other nodes have all our databases */
1376         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1377         if (ret != 0) {
1378                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1379                 return -1;
1380         }
1381         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1382
1383         /* update the database priority for all remote databases */
1384         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1385         if (ret != 0) {
1386                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1387         }
1388         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1389
1390
1391         /* set recovery mode to active on all nodes */
1392         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1393         if (ret != 0) {
1394                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1395                 return -1;
1396         }
1397
1398         /* execute the "startrecovery" event script on all nodes */
1399         ret = run_startrecovery_eventscript(rec, nodemap);
1400         if (ret!=0) {
1401                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1402                 return -1;
1403         }
1404
1405         /*
1406           update all nodes to have the same flags that we have
1407          */
1408         for (i=0;i<nodemap->num;i++) {
1409                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1410                         continue;
1411                 }
1412
1413                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1414                 if (ret != 0) {
1415                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1416                         return -1;
1417                 }
1418         }
1419
1420         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1421
1422         /* pick a new generation number */
1423         generation = new_generation();
1424
1425         /* change the vnnmap on this node to use the new generation 
1426            number but not on any other nodes.
1427            this guarantees that if we abort the recovery prematurely
1428            for some reason (a node stops responding?)
1429            that we can just return immediately and we will reenter
1430            recovery shortly again.
1431            I.e. we deliberately leave the cluster with an inconsistent
1432            generation id to allow us to abort recovery at any stage and
1433            just restart it from scratch.
1434          */
1435         vnnmap->generation = generation;
1436         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1437         if (ret != 0) {
1438                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1439                 return -1;
1440         }
1441
1442         data.dptr = (void *)&generation;
1443         data.dsize = sizeof(uint32_t);
1444
1445         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1446         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1447                                         nodes, 0,
1448                                         CONTROL_TIMEOUT(), false, data,
1449                                         NULL,
1450                                         transaction_start_fail_callback,
1451                                         rec) != 0) {
1452                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1453                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1454                                         nodes, 0,
1455                                         CONTROL_TIMEOUT(), false, tdb_null,
1456                                         NULL,
1457                                         NULL,
1458                                         NULL) != 0) {
1459                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1460                 }
1461                 return -1;
1462         }
1463
1464         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1465
1466         for (i=0;i<dbmap->num;i++) {
1467                 ret = recover_database(rec, mem_ctx,
1468                                        dbmap->dbs[i].dbid,
1469                                        dbmap->dbs[i].persistent,
1470                                        pnn, nodemap, generation);
1471                 if (ret != 0) {
1472                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1473                         return -1;
1474                 }
1475         }
1476
1477         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1478
1479         /* commit all the changes */
1480         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1481                                         nodes, 0,
1482                                         CONTROL_TIMEOUT(), false, data,
1483                                         NULL, NULL,
1484                                         NULL) != 0) {
1485                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1486                 return -1;
1487         }
1488
1489         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1490         
1491
1492         /* update the capabilities for all nodes */
1493         ret = update_capabilities(ctdb, nodemap);
1494         if (ret!=0) {
1495                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1496                 return -1;
1497         }
1498
1499         /* build a new vnn map with all the currently active and
1500            unbanned nodes */
1501         generation = new_generation();
1502         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1503         CTDB_NO_MEMORY(ctdb, vnnmap);
1504         vnnmap->generation = generation;
1505         vnnmap->size = 0;
1506         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1507         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1508         for (i=j=0;i<nodemap->num;i++) {
1509                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1510                         continue;
1511                 }
1512                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1513                         /* this node can not be an lmaster */
1514                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1515                         continue;
1516                 }
1517
1518                 vnnmap->size++;
1519                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1520                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1521                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1522
1523         }
1524         if (vnnmap->size == 0) {
1525                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1526                 vnnmap->size++;
1527                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1528                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1529                 vnnmap->map[0] = pnn;
1530         }       
1531
1532         /* update to the new vnnmap on all nodes */
1533         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1534         if (ret != 0) {
1535                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1536                 return -1;
1537         }
1538
1539         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1540
1541         /* update recmaster to point to us for all nodes */
1542         ret = set_recovery_master(ctdb, nodemap, pnn);
1543         if (ret!=0) {
1544                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1545                 return -1;
1546         }
1547
1548         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1549
1550         /*
1551           update all nodes to have the same flags that we have
1552          */
1553         for (i=0;i<nodemap->num;i++) {
1554                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1555                         continue;
1556                 }
1557
1558                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1559                 if (ret != 0) {
1560                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1561                         return -1;
1562                 }
1563         }
1564
1565         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1566
1567         /* disable recovery mode */
1568         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1569         if (ret != 0) {
1570                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1571                 return -1;
1572         }
1573
1574         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1575
1576         /*
1577           tell nodes to takeover their public IPs
1578          */
1579         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1580         if (ret != 0) {
1581                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1582                                  culprit));
1583                 return -1;
1584         }
1585         rec->need_takeover_run = false;
1586         ret = ctdb_takeover_run(ctdb, nodemap);
1587         if (ret != 0) {
1588                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1589                 return -1;
1590         }
1591         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1592
1593         /* execute the "recovered" event script on all nodes */
1594         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1595         if (ret!=0) {
1596                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1597                 return -1;
1598         }
1599
1600         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1601
1602         /* send a message to all clients telling them that the cluster 
1603            has been reconfigured */
1604         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1605
1606         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1607
1608         rec->need_recovery = false;
1609
1610         /* we managed to complete a full recovery, make sure to forgive
1611            any past sins by the nodes that could now participate in the
1612            recovery.
1613         */
1614         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1615         for (i=0;i<nodemap->num;i++) {
1616                 struct ctdb_banning_state *ban_state;
1617
1618                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1619                         continue;
1620                 }
1621
1622                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1623                 if (ban_state == NULL) {
1624                         continue;
1625                 }
1626
1627                 ban_state->count = 0;
1628         }
1629
1630
1631         /* We just finished a recovery successfully. 
1632            We now wait for rerecovery_timeout before we allow 
1633            another recovery to take place.
1634         */
1635         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1636         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1637         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1638
1639         return 0;
1640 }
1641
1642
1643 /*
1644   elections are won by first checking the number of connected nodes, then
1645   the priority time, then the pnn
1646  */
1647 struct election_message {
1648         uint32_t num_connected;
1649         struct timeval priority_time;
1650         uint32_t pnn;
1651         uint32_t node_flags;
1652 };
1653
1654 /*
1655   form this nodes election data
1656  */
1657 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1658 {
1659         int ret, i;
1660         struct ctdb_node_map *nodemap;
1661         struct ctdb_context *ctdb = rec->ctdb;
1662
1663         ZERO_STRUCTP(em);
1664
1665         em->pnn = rec->ctdb->pnn;
1666         em->priority_time = rec->priority_time;
1667
1668         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1669         if (ret != 0) {
1670                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1671                 return;
1672         }
1673
1674         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1675         em->node_flags = rec->node_flags;
1676
1677         for (i=0;i<nodemap->num;i++) {
1678                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1679                         em->num_connected++;
1680                 }
1681         }
1682
1683         /* we shouldnt try to win this election if we cant be a recmaster */
1684         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1685                 em->num_connected = 0;
1686                 em->priority_time = timeval_current();
1687         }
1688
1689         talloc_free(nodemap);
1690 }
1691
1692 /*
1693   see if the given election data wins
1694  */
1695 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1696 {
1697         struct election_message myem;
1698         int cmp = 0;
1699
1700         ctdb_election_data(rec, &myem);
1701
1702         /* we cant win if we dont have the recmaster capability */
1703         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1704                 return false;
1705         }
1706
1707         /* we cant win if we are banned */
1708         if (rec->node_flags & NODE_FLAGS_BANNED) {
1709                 return false;
1710         }       
1711
1712         /* we cant win if we are stopped */
1713         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1714                 return false;
1715         }       
1716
1717         /* we will automatically win if the other node is banned */
1718         if (em->node_flags & NODE_FLAGS_BANNED) {
1719                 return true;
1720         }
1721
1722         /* we will automatically win if the other node is banned */
1723         if (em->node_flags & NODE_FLAGS_STOPPED) {
1724                 return true;
1725         }
1726
1727         /* try to use the most connected node */
1728         if (cmp == 0) {
1729                 cmp = (int)myem.num_connected - (int)em->num_connected;
1730         }
1731
1732         /* then the longest running node */
1733         if (cmp == 0) {
1734                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1735         }
1736
1737         if (cmp == 0) {
1738                 cmp = (int)myem.pnn - (int)em->pnn;
1739         }
1740
1741         return cmp > 0;
1742 }
1743
1744 /*
1745   send out an election request
1746  */
1747 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1748 {
1749         int ret;
1750         TDB_DATA election_data;
1751         struct election_message emsg;
1752         uint64_t srvid;
1753         struct ctdb_context *ctdb = rec->ctdb;
1754
1755         srvid = CTDB_SRVID_RECOVERY;
1756
1757         ctdb_election_data(rec, &emsg);
1758
1759         election_data.dsize = sizeof(struct election_message);
1760         election_data.dptr  = (unsigned char *)&emsg;
1761
1762
1763         /* send an election message to all active nodes */
1764         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1765         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1766
1767
1768         /* A new node that is already frozen has entered the cluster.
1769            The existing nodes are not frozen and dont need to be frozen
1770            until the election has ended and we start the actual recovery
1771         */
1772         if (update_recmaster == true) {
1773                 /* first we assume we will win the election and set 
1774                    recoverymaster to be ourself on the current node
1775                  */
1776                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1777                 if (ret != 0) {
1778                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1779                         return -1;
1780                 }
1781         }
1782
1783
1784         return 0;
1785 }
1786
1787 /*
1788   this function will unban all nodes in the cluster
1789 */
1790 static void unban_all_nodes(struct ctdb_context *ctdb)
1791 {
1792         int ret, i;
1793         struct ctdb_node_map *nodemap;
1794         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1795         
1796         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1797         if (ret != 0) {
1798                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1799                 return;
1800         }
1801
1802         for (i=0;i<nodemap->num;i++) {
1803                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1804                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1805                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1806                 }
1807         }
1808
1809         talloc_free(tmp_ctx);
1810 }
1811
1812
1813 /*
1814   we think we are winning the election - send a broadcast election request
1815  */
1816 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1817 {
1818         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1819         int ret;
1820
1821         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1822         if (ret != 0) {
1823                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1824         }
1825
1826         talloc_free(rec->send_election_te);
1827         rec->send_election_te = NULL;
1828 }
1829
1830 /*
1831   handler for memory dumps
1832 */
1833 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1834                              TDB_DATA data, void *private_data)
1835 {
1836         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1837         TDB_DATA *dump;
1838         int ret;
1839         struct rd_memdump_reply *rd;
1840
1841         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1842                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1843                 talloc_free(tmp_ctx);
1844                 return;
1845         }
1846         rd = (struct rd_memdump_reply *)data.dptr;
1847
1848         dump = talloc_zero(tmp_ctx, TDB_DATA);
1849         if (dump == NULL) {
1850                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1851                 talloc_free(tmp_ctx);
1852                 return;
1853         }
1854         ret = ctdb_dump_memory(ctdb, dump);
1855         if (ret != 0) {
1856                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1857                 talloc_free(tmp_ctx);
1858                 return;
1859         }
1860
1861 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1862
1863         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1866                 talloc_free(tmp_ctx);
1867                 return;
1868         }
1869
1870         talloc_free(tmp_ctx);
1871 }
1872
1873 /*
1874   handler for reload_nodes
1875 */
1876 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1877                              TDB_DATA data, void *private_data)
1878 {
1879         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1880
1881         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1882
1883         reload_nodes_file(rec->ctdb);
1884 }
1885
1886
1887 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1888                               struct timeval yt, void *p)
1889 {
1890         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1891
1892         talloc_free(rec->ip_check_disable_ctx);
1893         rec->ip_check_disable_ctx = NULL;
1894 }
1895
1896
1897 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1898                              TDB_DATA data, void *private_data)
1899 {
1900         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1901         struct ctdb_public_ip *ip;
1902
1903         if (rec->recmaster != rec->ctdb->pnn) {
1904                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1905                 return;
1906         }
1907
1908         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1909                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1910                 return;
1911         }
1912
1913         ip = (struct ctdb_public_ip *)data.dptr;
1914
1915         update_ip_assignment_tree(rec->ctdb, ip);
1916 }
1917
1918
1919 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1920                              TDB_DATA data, void *private_data)
1921 {
1922         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1923         uint32_t timeout;
1924
1925         if (rec->ip_check_disable_ctx != NULL) {
1926                 talloc_free(rec->ip_check_disable_ctx);
1927                 rec->ip_check_disable_ctx = NULL;
1928         }
1929
1930         if (data.dsize != sizeof(uint32_t)) {
1931                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1932                                  "expexting %lu\n", (long unsigned)data.dsize,
1933                                  (long unsigned)sizeof(uint32_t)));
1934                 return;
1935         }
1936         if (data.dptr == NULL) {
1937                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1938                 return;
1939         }
1940
1941         timeout = *((uint32_t *)data.dptr);
1942         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1943
1944         rec->ip_check_disable_ctx = talloc_new(rec);
1945         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1946
1947         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1948 }
1949
1950
1951 /*
1952   handler for ip reallocate, just add it to the list of callers and 
1953   handle this later in the monitor_cluster loop so we do not recurse
1954   with other callers to takeover_run()
1955 */
1956 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1957                              TDB_DATA data, void *private_data)
1958 {
1959         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1960         struct ip_reallocate_list *caller;
1961
1962         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1963                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1964                 return;
1965         }
1966
1967         if (rec->ip_reallocate_ctx == NULL) {
1968                 rec->ip_reallocate_ctx = talloc_new(rec);
1969                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1970         }
1971
1972         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1973         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1974
1975         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1976         caller->next = rec->reallocate_callers;
1977         rec->reallocate_callers = caller;
1978
1979         return;
1980 }
1981
1982 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1983 {
1984         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1985         TDB_DATA result;
1986         int32_t ret;
1987         struct ip_reallocate_list *callers;
1988         uint32_t culprit;
1989
1990         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1991
1992         /* update the list of public ips that a node can handle for
1993            all connected nodes
1994         */
1995         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
1996         if (ret != 0) {
1997                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1998                                  culprit));
1999                 rec->need_takeover_run = true;
2000         }
2001         if (ret == 0) {
2002                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2003                 if (ret != 0) {
2004                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2005                                          culprit));
2006                         rec->need_takeover_run = true;
2007                 }
2008         }
2009
2010         result.dsize = sizeof(int32_t);
2011         result.dptr  = (uint8_t *)&ret;
2012
2013         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2014
2015                 /* Someone that sent srvid==0 does not want a reply */
2016                 if (callers->rd->srvid == 0) {
2017                         continue;
2018                 }
2019                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2020                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2021                                   (unsigned long long)callers->rd->srvid));
2022                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2023                 if (ret != 0) {
2024                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2025                                          "message to %u:%llu\n",
2026                                          (unsigned)callers->rd->pnn,
2027                                          (unsigned long long)callers->rd->srvid));
2028                 }
2029         }
2030
2031         talloc_free(tmp_ctx);
2032         talloc_free(rec->ip_reallocate_ctx);
2033         rec->ip_reallocate_ctx = NULL;
2034         rec->reallocate_callers = NULL;
2035         
2036 }
2037
2038
2039 /*
2040   handler for recovery master elections
2041 */
2042 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2043                              TDB_DATA data, void *private_data)
2044 {
2045         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2046         int ret;
2047         struct election_message *em = (struct election_message *)data.dptr;
2048         TALLOC_CTX *mem_ctx;
2049
2050         /* we got an election packet - update the timeout for the election */
2051         talloc_free(rec->election_timeout);
2052         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2053                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2054                                                 ctdb_election_timeout, rec);
2055
2056         mem_ctx = talloc_new(ctdb);
2057
2058         /* someone called an election. check their election data
2059            and if we disagree and we would rather be the elected node, 
2060            send a new election message to all other nodes
2061          */
2062         if (ctdb_election_win(rec, em)) {
2063                 if (!rec->send_election_te) {
2064                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2065                                                                 timeval_current_ofs(0, 500000),
2066                                                                 election_send_request, rec);
2067                 }
2068                 talloc_free(mem_ctx);
2069                 /*unban_all_nodes(ctdb);*/
2070                 return;
2071         }
2072         
2073         /* we didn't win */
2074         talloc_free(rec->send_election_te);
2075         rec->send_election_te = NULL;
2076
2077         if (ctdb->tunable.verify_recovery_lock != 0) {
2078                 /* release the recmaster lock */
2079                 if (em->pnn != ctdb->pnn &&
2080                     ctdb->recovery_lock_fd != -1) {
2081                         close(ctdb->recovery_lock_fd);
2082                         ctdb->recovery_lock_fd = -1;
2083                         unban_all_nodes(ctdb);
2084                 }
2085         }
2086
2087         /* ok, let that guy become recmaster then */
2088         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2089         if (ret != 0) {
2090                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2091                 talloc_free(mem_ctx);
2092                 return;
2093         }
2094
2095         talloc_free(mem_ctx);
2096         return;
2097 }
2098
2099
2100 /*
2101   force the start of the election process
2102  */
2103 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2104                            struct ctdb_node_map *nodemap)
2105 {
2106         int ret;
2107         struct ctdb_context *ctdb = rec->ctdb;
2108
2109         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2110
2111         /* set all nodes to recovery mode to stop all internode traffic */
2112         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2113         if (ret != 0) {
2114                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2115                 return;
2116         }
2117
2118         talloc_free(rec->election_timeout);
2119         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2120                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2121                                                 ctdb_election_timeout, rec);
2122
2123         ret = send_election_request(rec, pnn, true);
2124         if (ret!=0) {
2125                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2126                 return;
2127         }
2128
2129         /* wait for a few seconds to collect all responses */
2130         ctdb_wait_election(rec);
2131 }
2132
2133
2134
2135 /*
2136   handler for when a node changes its flags
2137 */
2138 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2139                             TDB_DATA data, void *private_data)
2140 {
2141         int ret;
2142         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2143         struct ctdb_node_map *nodemap=NULL;
2144         TALLOC_CTX *tmp_ctx;
2145         uint32_t changed_flags;
2146         int i;
2147         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2148         int disabled_flag_changed;
2149
2150         if (data.dsize != sizeof(*c)) {
2151                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2152                 return;
2153         }
2154
2155         tmp_ctx = talloc_new(ctdb);
2156         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2157
2158         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2159         if (ret != 0) {
2160                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2161                 talloc_free(tmp_ctx);
2162                 return;         
2163         }
2164
2165
2166         for (i=0;i<nodemap->num;i++) {
2167                 if (nodemap->nodes[i].pnn == c->pnn) break;
2168         }
2169
2170         if (i == nodemap->num) {
2171                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2172                 talloc_free(tmp_ctx);
2173                 return;
2174         }
2175
2176         changed_flags = c->old_flags ^ c->new_flags;
2177
2178         if (nodemap->nodes[i].flags != c->new_flags) {
2179                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2180         }
2181
2182         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2183
2184         nodemap->nodes[i].flags = c->new_flags;
2185
2186         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2187                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2188
2189         if (ret == 0) {
2190                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2191                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2192         }
2193         
2194         if (ret == 0 &&
2195             ctdb->recovery_master == ctdb->pnn &&
2196             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2197                 /* Only do the takeover run if the perm disabled or unhealthy
2198                    flags changed since these will cause an ip failover but not
2199                    a recovery.
2200                    If the node became disconnected or banned this will also
2201                    lead to an ip address failover but that is handled 
2202                    during recovery
2203                 */
2204                 if (disabled_flag_changed) {
2205                         rec->need_takeover_run = true;
2206                 }
2207         }
2208
2209         talloc_free(tmp_ctx);
2210 }
2211
2212 /*
2213   handler for when we need to push out flag changes ot all other nodes
2214 */
2215 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2216                             TDB_DATA data, void *private_data)
2217 {
2218         int ret;
2219         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2220         struct ctdb_node_map *nodemap=NULL;
2221         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2222         uint32_t recmaster;
2223         uint32_t *nodes;
2224
2225         /* find the recovery master */
2226         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2227         if (ret != 0) {
2228                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2229                 talloc_free(tmp_ctx);
2230                 return;
2231         }
2232
2233         /* read the node flags from the recmaster */
2234         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2235         if (ret != 0) {
2236                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2237                 talloc_free(tmp_ctx);
2238                 return;
2239         }
2240         if (c->pnn >= nodemap->num) {
2241                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2242                 talloc_free(tmp_ctx);
2243                 return;
2244         }
2245
2246         /* send the flags update to all connected nodes */
2247         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2248
2249         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2250                                       nodes, 0, CONTROL_TIMEOUT(),
2251                                       false, data,
2252                                       NULL, NULL,
2253                                       NULL) != 0) {
2254                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2255
2256                 talloc_free(tmp_ctx);
2257                 return;
2258         }
2259
2260         talloc_free(tmp_ctx);
2261 }
2262
2263
2264 struct verify_recmode_normal_data {
2265         uint32_t count;
2266         enum monitor_result status;
2267 };
2268
2269 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2270 {
2271         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2272
2273
2274         /* one more node has responded with recmode data*/
2275         rmdata->count--;
2276
2277         /* if we failed to get the recmode, then return an error and let
2278            the main loop try again.
2279         */
2280         if (state->state != CTDB_CONTROL_DONE) {
2281                 if (rmdata->status == MONITOR_OK) {
2282                         rmdata->status = MONITOR_FAILED;
2283                 }
2284                 return;
2285         }
2286
2287         /* if we got a response, then the recmode will be stored in the
2288            status field
2289         */
2290         if (state->status != CTDB_RECOVERY_NORMAL) {
2291                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2292                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2293         }
2294
2295         return;
2296 }
2297
2298
2299 /* verify that all nodes are in normal recovery mode */
2300 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2301 {
2302         struct verify_recmode_normal_data *rmdata;
2303         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2304         struct ctdb_client_control_state *state;
2305         enum monitor_result status;
2306         int j;
2307         
2308         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2309         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2310         rmdata->count  = 0;
2311         rmdata->status = MONITOR_OK;
2312
2313         /* loop over all active nodes and send an async getrecmode call to 
2314            them*/
2315         for (j=0; j<nodemap->num; j++) {
2316                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2317                         continue;
2318                 }
2319                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2320                                         CONTROL_TIMEOUT(), 
2321                                         nodemap->nodes[j].pnn);
2322                 if (state == NULL) {
2323                         /* we failed to send the control, treat this as 
2324                            an error and try again next iteration
2325                         */                      
2326                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2327                         talloc_free(mem_ctx);
2328                         return MONITOR_FAILED;
2329                 }
2330
2331                 /* set up the callback functions */
2332                 state->async.fn = verify_recmode_normal_callback;
2333                 state->async.private_data = rmdata;
2334
2335                 /* one more control to wait for to complete */
2336                 rmdata->count++;
2337         }
2338
2339
2340         /* now wait for up to the maximum number of seconds allowed
2341            or until all nodes we expect a response from has replied
2342         */
2343         while (rmdata->count > 0) {
2344                 event_loop_once(ctdb->ev);
2345         }
2346
2347         status = rmdata->status;
2348         talloc_free(mem_ctx);
2349         return status;
2350 }
2351
2352
2353 struct verify_recmaster_data {
2354         struct ctdb_recoverd *rec;
2355         uint32_t count;
2356         uint32_t pnn;
2357         enum monitor_result status;
2358 };
2359
2360 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2361 {
2362         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2363
2364
2365         /* one more node has responded with recmaster data*/
2366         rmdata->count--;
2367
2368         /* if we failed to get the recmaster, then return an error and let
2369            the main loop try again.
2370         */
2371         if (state->state != CTDB_CONTROL_DONE) {
2372                 if (rmdata->status == MONITOR_OK) {
2373                         rmdata->status = MONITOR_FAILED;
2374                 }
2375                 return;
2376         }
2377
2378         /* if we got a response, then the recmaster will be stored in the
2379            status field
2380         */
2381         if (state->status != rmdata->pnn) {
2382                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2383                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2384                 rmdata->status = MONITOR_ELECTION_NEEDED;
2385         }
2386
2387         return;
2388 }
2389
2390
2391 /* verify that all nodes agree that we are the recmaster */
2392 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2393 {
2394         struct ctdb_context *ctdb = rec->ctdb;
2395         struct verify_recmaster_data *rmdata;
2396         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2397         struct ctdb_client_control_state *state;
2398         enum monitor_result status;
2399         int j;
2400         
2401         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2402         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2403         rmdata->rec    = rec;
2404         rmdata->count  = 0;
2405         rmdata->pnn    = pnn;
2406         rmdata->status = MONITOR_OK;
2407
2408         /* loop over all active nodes and send an async getrecmaster call to 
2409            them*/
2410         for (j=0; j<nodemap->num; j++) {
2411                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2412                         continue;
2413                 }
2414                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2415                                         CONTROL_TIMEOUT(),
2416                                         nodemap->nodes[j].pnn);
2417                 if (state == NULL) {
2418                         /* we failed to send the control, treat this as 
2419                            an error and try again next iteration
2420                         */                      
2421                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2422                         talloc_free(mem_ctx);
2423                         return MONITOR_FAILED;
2424                 }
2425
2426                 /* set up the callback functions */
2427                 state->async.fn = verify_recmaster_callback;
2428                 state->async.private_data = rmdata;
2429
2430                 /* one more control to wait for to complete */
2431                 rmdata->count++;
2432         }
2433
2434
2435         /* now wait for up to the maximum number of seconds allowed
2436            or until all nodes we expect a response from has replied
2437         */
2438         while (rmdata->count > 0) {
2439                 event_loop_once(ctdb->ev);
2440         }
2441
2442         status = rmdata->status;
2443         talloc_free(mem_ctx);
2444         return status;
2445 }
2446
2447
2448 /* called to check that the local allocation of public ip addresses is ok.
2449 */
2450 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2451 {
2452         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2453         struct ctdb_control_get_ifaces *ifaces = NULL;
2454         struct ctdb_all_public_ips *ips = NULL;
2455         struct ctdb_uptime *uptime1 = NULL;
2456         struct ctdb_uptime *uptime2 = NULL;
2457         int ret, j;
2458         bool need_iface_check = false;
2459         bool need_takeover_run = false;
2460
2461         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2462                                 CTDB_CURRENT_NODE, &uptime1);
2463         if (ret != 0) {
2464                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2465                 talloc_free(mem_ctx);
2466                 return -1;
2467         }
2468
2469
2470         /* read the interfaces from the local node */
2471         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2472         if (ret != 0) {
2473                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2474                 talloc_free(mem_ctx);
2475                 return -1;
2476         }
2477
2478         if (!rec->ifaces) {
2479                 need_iface_check = true;
2480         } else if (rec->ifaces->num != ifaces->num) {
2481                 need_iface_check = true;
2482         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2483                 need_iface_check = true;
2484         }
2485
2486         if (need_iface_check) {
2487                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2488                                      "local node %u - force takeover run\n",
2489                                      pnn));
2490                 need_takeover_run = true;
2491         }
2492
2493         /* read the ip allocation from the local node */
2494         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2495         if (ret != 0) {
2496                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2497                 talloc_free(mem_ctx);
2498                 return -1;
2499         }
2500
2501         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2502                                 CTDB_CURRENT_NODE, &uptime2);
2503         if (ret != 0) {
2504                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2505                 talloc_free(mem_ctx);
2506                 return -1;
2507         }
2508
2509         /* skip the check if the startrecovery time has changed */
2510         if (timeval_compare(&uptime1->last_recovery_started,
2511                             &uptime2->last_recovery_started) != 0) {
2512                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2513                 talloc_free(mem_ctx);
2514                 return 0;
2515         }
2516
2517         /* skip the check if the endrecovery time has changed */
2518         if (timeval_compare(&uptime1->last_recovery_finished,
2519                             &uptime2->last_recovery_finished) != 0) {
2520                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2521                 talloc_free(mem_ctx);
2522                 return 0;
2523         }
2524
2525         /* skip the check if we have started but not finished recovery */
2526         if (timeval_compare(&uptime1->last_recovery_finished,
2527                             &uptime1->last_recovery_started) != 1) {
2528                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2529                 talloc_free(mem_ctx);
2530
2531                 return 0;
2532         }
2533
2534         talloc_free(rec->ifaces);
2535         rec->ifaces = talloc_steal(rec, ifaces);
2536
2537         /* verify that we have the ip addresses we should have
2538            and we dont have ones we shouldnt have.
2539            if we find an inconsistency we set recmode to
2540            active on the local node and wait for the recmaster
2541            to do a full blown recovery
2542         */
2543         for (j=0; j<ips->num; j++) {
2544                 if (ips->ips[j].pnn == pnn) {
2545                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2546                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2547                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2548                                 need_takeover_run = true;
2549                         }
2550                 } else {
2551                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2552                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2553                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2554                                 need_takeover_run = true;
2555                         }
2556                 }
2557         }
2558
2559         if (need_takeover_run) {
2560                 struct takeover_run_reply rd;
2561                 TDB_DATA data;
2562
2563                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2564
2565                 rd.pnn = ctdb->pnn;
2566                 rd.srvid = 0;
2567                 data.dptr = (uint8_t *)&rd;
2568                 data.dsize = sizeof(rd);
2569
2570                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2571                 if (ret != 0) {
2572                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2573                 }
2574         }
2575         talloc_free(mem_ctx);
2576         return 0;
2577 }
2578
2579
2580 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2581 {
2582         struct ctdb_node_map **remote_nodemaps = callback_data;
2583
2584         if (node_pnn >= ctdb->num_nodes) {
2585                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2586                 return;
2587         }
2588
2589         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2590
2591 }
2592
2593 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2594         struct ctdb_node_map *nodemap,
2595         struct ctdb_node_map **remote_nodemaps)
2596 {
2597         uint32_t *nodes;
2598
2599         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2600         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2601                                         nodes, 0,
2602                                         CONTROL_TIMEOUT(), false, tdb_null,
2603                                         async_getnodemap_callback,
2604                                         NULL,
2605                                         remote_nodemaps) != 0) {
2606                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2607
2608                 return -1;
2609         }
2610
2611         return 0;
2612 }
2613
2614 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2615 struct ctdb_check_reclock_state {
2616         struct ctdb_context *ctdb;
2617         struct timeval start_time;
2618         int fd[2];
2619         pid_t child;
2620         struct timed_event *te;
2621         struct fd_event *fde;
2622         enum reclock_child_status status;
2623 };
2624
2625 /* when we free the reclock state we must kill any child process.
2626 */
2627 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2628 {
2629         struct ctdb_context *ctdb = state->ctdb;
2630
2631         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2632
2633         if (state->fd[0] != -1) {
2634                 close(state->fd[0]);
2635                 state->fd[0] = -1;
2636         }
2637         if (state->fd[1] != -1) {
2638                 close(state->fd[1]);
2639                 state->fd[1] = -1;
2640         }
2641         kill(state->child, SIGKILL);
2642         return 0;
2643 }
2644
2645 /*
2646   called if our check_reclock child times out. this would happen if
2647   i/o to the reclock file blocks.
2648  */
2649 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2650                                          struct timeval t, void *private_data)
2651 {
2652         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2653                                            struct ctdb_check_reclock_state);
2654
2655         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2656         state->status = RECLOCK_TIMEOUT;
2657 }
2658
2659 /* this is called when the child process has completed checking the reclock
2660    file and has written data back to us through the pipe.
2661 */
2662 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2663                              uint16_t flags, void *private_data)
2664 {
2665         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2666                                              struct ctdb_check_reclock_state);
2667         char c = 0;
2668         int ret;
2669
2670         /* we got a response from our child process so we can abort the
2671            timeout.
2672         */
2673         talloc_free(state->te);
2674         state->te = NULL;
2675
2676         ret = read(state->fd[0], &c, 1);
2677         if (ret != 1 || c != RECLOCK_OK) {
2678                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2679                 state->status = RECLOCK_FAILED;
2680
2681                 return;
2682         }
2683
2684         state->status = RECLOCK_OK;
2685         return;
2686 }
2687
2688 static int check_recovery_lock(struct ctdb_context *ctdb)
2689 {
2690         int ret;
2691         struct ctdb_check_reclock_state *state;
2692         pid_t parent = getpid();
2693
2694         if (ctdb->recovery_lock_fd == -1) {
2695                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2696                 return -1;
2697         }
2698
2699         state = talloc(ctdb, struct ctdb_check_reclock_state);
2700         CTDB_NO_MEMORY(ctdb, state);
2701
2702         state->ctdb = ctdb;
2703         state->start_time = timeval_current();
2704         state->status = RECLOCK_CHECKING;
2705         state->fd[0] = -1;
2706         state->fd[1] = -1;
2707
2708         ret = pipe(state->fd);
2709         if (ret != 0) {
2710                 talloc_free(state);
2711                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2712                 return -1;
2713         }
2714
2715         state->child = fork();
2716         if (state->child == (pid_t)-1) {
2717                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2718                 close(state->fd[0]);
2719                 state->fd[0] = -1;
2720                 close(state->fd[1]);
2721                 state->fd[1] = -1;
2722                 talloc_free(state);
2723                 return -1;
2724         }
2725
2726         if (state->child == 0) {
2727                 char cc = RECLOCK_OK;
2728                 close(state->fd[0]);
2729                 state->fd[0] = -1;
2730
2731                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2732                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2733                         cc = RECLOCK_FAILED;
2734                 }
2735
2736                 write(state->fd[1], &cc, 1);
2737                 /* make sure we die when our parent dies */
2738                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2739                         sleep(5);
2740                         write(state->fd[1], &cc, 1);
2741                 }
2742                 _exit(0);
2743         }
2744         close(state->fd[1]);
2745         state->fd[1] = -1;
2746         set_close_on_exec(state->fd[0]);
2747
2748         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2749
2750         talloc_set_destructor(state, check_reclock_destructor);
2751
2752         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2753                                     ctdb_check_reclock_timeout, state);
2754         if (state->te == NULL) {
2755                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2756                 talloc_free(state);
2757                 return -1;
2758         }
2759
2760         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2761                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2762                                 reclock_child_handler,
2763                                 (void *)state);
2764
2765         if (state->fde == NULL) {
2766                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2767                 talloc_free(state);
2768                 return -1;
2769         }
2770
2771         while (state->status == RECLOCK_CHECKING) {
2772                 event_loop_once(ctdb->ev);
2773         }
2774
2775         if (state->status == RECLOCK_FAILED) {
2776                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2777                 close(ctdb->recovery_lock_fd);
2778                 ctdb->recovery_lock_fd = -1;
2779                 talloc_free(state);
2780                 return -1;
2781         }
2782
2783         talloc_free(state);
2784         return 0;
2785 }
2786
2787 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2788 {
2789         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2790         const char *reclockfile;
2791
2792         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2793                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2794                 talloc_free(tmp_ctx);
2795                 return -1;      
2796         }
2797
2798         if (reclockfile == NULL) {
2799                 if (ctdb->recovery_lock_file != NULL) {
2800                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2801                         talloc_free(ctdb->recovery_lock_file);
2802                         ctdb->recovery_lock_file = NULL;
2803                         if (ctdb->recovery_lock_fd != -1) {
2804                                 close(ctdb->recovery_lock_fd);
2805                                 ctdb->recovery_lock_fd = -1;
2806                         }
2807                 }
2808                 ctdb->tunable.verify_recovery_lock = 0;
2809                 talloc_free(tmp_ctx);
2810                 return 0;
2811         }
2812
2813         if (ctdb->recovery_lock_file == NULL) {
2814                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2815                 if (ctdb->recovery_lock_fd != -1) {
2816                         close(ctdb->recovery_lock_fd);
2817                         ctdb->recovery_lock_fd = -1;
2818                 }
2819                 talloc_free(tmp_ctx);
2820                 return 0;
2821         }
2822
2823
2824         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2825                 talloc_free(tmp_ctx);
2826                 return 0;
2827         }
2828
2829         talloc_free(ctdb->recovery_lock_file);
2830         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2831         ctdb->tunable.verify_recovery_lock = 0;
2832         if (ctdb->recovery_lock_fd != -1) {
2833                 close(ctdb->recovery_lock_fd);
2834                 ctdb->recovery_lock_fd = -1;
2835         }
2836
2837         talloc_free(tmp_ctx);
2838         return 0;
2839 }
2840                 
2841 /*
2842   the main monitoring loop
2843  */
2844 static void monitor_cluster(struct ctdb_context *ctdb)
2845 {
2846         uint32_t pnn;
2847         TALLOC_CTX *mem_ctx=NULL;
2848         struct ctdb_node_map *nodemap=NULL;
2849         struct ctdb_node_map *recmaster_nodemap=NULL;
2850         struct ctdb_node_map **remote_nodemaps=NULL;
2851         struct ctdb_vnn_map *vnnmap=NULL;
2852         struct ctdb_vnn_map *remote_vnnmap=NULL;
2853         int32_t debug_level;
2854         int i, j, ret;
2855         struct ctdb_recoverd *rec;
2856
2857         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2858
2859         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2860         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2861
2862         rec->ctdb = ctdb;
2863
2864         rec->priority_time = timeval_current();
2865
2866         /* register a message port for sending memory dumps */
2867         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2868
2869         /* register a message port for recovery elections */
2870         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2871
2872         /* when nodes are disabled/enabled */
2873         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2874
2875         /* when we are asked to puch out a flag change */
2876         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2877
2878         /* register a message port for vacuum fetch */
2879         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2880
2881         /* register a message port for reloadnodes  */
2882         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2883
2884         /* register a message port for performing a takeover run */
2885         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2886
2887         /* register a message port for disabling the ip check for a short while */
2888         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2889
2890         /* register a message port for updating the recovery daemons node assignment for an ip */
2891         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
2892
2893 again:
2894         if (mem_ctx) {
2895                 talloc_free(mem_ctx);
2896                 mem_ctx = NULL;
2897         }
2898         mem_ctx = talloc_new(ctdb);
2899         if (!mem_ctx) {
2900                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2901                 exit(-1);
2902         }
2903
2904         /* we only check for recovery once every second */
2905         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2906
2907         /* verify that the main daemon is still running */
2908         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2909                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2910                 exit(-1);
2911         }
2912
2913         /* ping the local daemon to tell it we are alive */
2914         ctdb_ctrl_recd_ping(ctdb);
2915
2916         if (rec->election_timeout) {
2917                 /* an election is in progress */
2918                 goto again;
2919         }
2920
2921         /* read the debug level from the parent and update locally */
2922         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2923         if (ret !=0) {
2924                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2925                 goto again;
2926         }
2927         LogLevel = debug_level;
2928
2929
2930         /* We must check if we need to ban a node here but we want to do this
2931            as early as possible so we dont wait until we have pulled the node
2932            map from the local node. thats why we have the hardcoded value 20
2933         */
2934         for (i=0; i<ctdb->num_nodes; i++) {
2935                 struct ctdb_banning_state *ban_state;
2936
2937                 if (ctdb->nodes[i]->ban_state == NULL) {
2938                         continue;
2939                 }
2940                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2941                 if (ban_state->count < 20) {
2942                         continue;
2943                 }
2944                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2945                         ctdb->nodes[i]->pnn, ban_state->count,
2946                         ctdb->tunable.recovery_ban_period));
2947                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2948                 ban_state->count = 0;
2949         }
2950
2951         /* get relevant tunables */
2952         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2953         if (ret != 0) {
2954                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2955                 goto again;
2956         }
2957
2958         /* get the current recovery lock file from the server */
2959         if (update_recovery_lock_file(ctdb) != 0) {
2960                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2961                 goto again;
2962         }
2963
2964         /* Make sure that if recovery lock verification becomes disabled when
2965            we close the file
2966         */
2967         if (ctdb->tunable.verify_recovery_lock == 0) {
2968                 if (ctdb->recovery_lock_fd != -1) {
2969                         close(ctdb->recovery_lock_fd);
2970                         ctdb->recovery_lock_fd = -1;
2971                 }
2972         }
2973
2974         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2975         if (pnn == (uint32_t)-1) {
2976                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2977                 goto again;
2978         }
2979
2980         /* get the vnnmap */
2981         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2982         if (ret != 0) {
2983                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2984                 goto again;
2985         }
2986
2987
2988         /* get number of nodes */
2989         if (rec->nodemap) {
2990                 talloc_free(rec->nodemap);
2991                 rec->nodemap = NULL;
2992                 nodemap=NULL;
2993         }
2994         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2995         if (ret != 0) {
2996                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2997                 goto again;
2998         }
2999         nodemap = rec->nodemap;
3000
3001         /* check which node is the recovery master */
3002         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3003         if (ret != 0) {
3004                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3005                 goto again;
3006         }
3007
3008         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3009         if (rec->recmaster != pnn) {
3010                 if (rec->ip_reallocate_ctx != NULL) {
3011                         talloc_free(rec->ip_reallocate_ctx);
3012                         rec->ip_reallocate_ctx = NULL;
3013                         rec->reallocate_callers = NULL;
3014                 }
3015         }
3016         /* if there are takeovers requested, perform it and notify the waiters */
3017         if (rec->reallocate_callers) {
3018                 process_ipreallocate_requests(ctdb, rec);
3019         }
3020
3021         if (rec->recmaster == (uint32_t)-1) {
3022                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3023                 force_election(rec, pnn, nodemap);
3024                 goto again;
3025         }
3026
3027
3028         /* if the local daemon is STOPPED, we verify that the databases are
3029            also frozen and thet the recmode is set to active 
3030         */
3031         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3032                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3033                 if (ret != 0) {
3034                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3035                 }
3036                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3037                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3038
3039                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3040                         if (ret != 0) {
3041                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3042                                 goto again;
3043                         }
3044                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3045                         if (ret != 0) {
3046                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3047
3048                                 goto again;
3049                         }
3050                         goto again;
3051                 }
3052         }
3053         /* If the local node is stopped, verify we are not the recmaster 
3054            and yield this role if so
3055         */
3056         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3057                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3058                 force_election(rec, pnn, nodemap);
3059                 goto again;
3060         }
3061         
3062         /* check that we (recovery daemon) and the local ctdb daemon
3063            agrees on whether we are banned or not
3064         */
3065 //qqq
3066
3067         /* remember our own node flags */
3068         rec->node_flags = nodemap->nodes[pnn].flags;
3069
3070         /* count how many active nodes there are */
3071         rec->num_active    = 0;
3072         rec->num_connected = 0;
3073         for (i=0; i<nodemap->num; i++) {
3074                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3075                         rec->num_active++;
3076                 }
3077                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3078                         rec->num_connected++;
3079                 }
3080         }
3081
3082
3083         /* verify that the recmaster node is still active */
3084         for (j=0; j<nodemap->num; j++) {
3085                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3086                         break;
3087                 }
3088         }
3089
3090         if (j == nodemap->num) {
3091                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3092                 force_election(rec, pnn, nodemap);
3093                 goto again;
3094         }
3095
3096         /* if recovery master is disconnected we must elect a new recmaster */
3097         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3098                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3099                 force_election(rec, pnn, nodemap);
3100                 goto again;
3101         }
3102
3103         /* grap the nodemap from the recovery master to check if it is banned */
3104         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3105                                    mem_ctx, &recmaster_nodemap);
3106         if (ret != 0) {
3107                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3108                           nodemap->nodes[j].pnn));
3109                 goto again;
3110         }
3111
3112
3113         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3114                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3115                 force_election(rec, pnn, nodemap);
3116                 goto again;
3117         }
3118
3119
3120         /* verify that we have all ip addresses we should have and we dont
3121          * have addresses we shouldnt have.
3122          */ 
3123         if (ctdb->do_checkpublicip) {
3124                 if (rec->ip_check_disable_ctx == NULL) {
3125                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3126                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3127                         }
3128                 }
3129         }
3130
3131
3132         /* if we are not the recmaster then we do not need to check
3133            if recovery is needed
3134          */
3135         if (pnn != rec->recmaster) {
3136                 goto again;
3137         }
3138
3139
3140         /* ensure our local copies of flags are right */
3141         ret = update_local_flags(rec, nodemap);
3142         if (ret == MONITOR_ELECTION_NEEDED) {
3143                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3144                 force_election(rec, pnn, nodemap);
3145                 goto again;
3146         }
3147         if (ret != MONITOR_OK) {
3148                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3149                 goto again;
3150         }
3151
3152         if (ctdb->num_nodes != nodemap->num) {
3153                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3154                 reload_nodes_file(ctdb);
3155                 goto again;
3156         }
3157
3158         /* verify that all active nodes agree that we are the recmaster */
3159         switch (verify_recmaster(rec, nodemap, pnn)) {
3160         case MONITOR_RECOVERY_NEEDED:
3161                 /* can not happen */
3162                 goto again;
3163         case MONITOR_ELECTION_NEEDED:
3164                 force_election(rec, pnn, nodemap);
3165                 goto again;
3166         case MONITOR_OK:
3167                 break;
3168         case MONITOR_FAILED:
3169                 goto again;
3170         }
3171
3172
3173         if (rec->need_recovery) {
3174                 /* a previous recovery didn't finish */
3175                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3176                 goto again;             
3177         }
3178
3179         /* verify that all active nodes are in normal mode 
3180            and not in recovery mode 
3181         */
3182         switch (verify_recmode(ctdb, nodemap)) {
3183         case MONITOR_RECOVERY_NEEDED:
3184                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3185                 goto again;
3186         case MONITOR_FAILED:
3187                 goto again;
3188         case MONITOR_ELECTION_NEEDED:
3189                 /* can not happen */
3190         case MONITOR_OK:
3191                 break;
3192         }
3193
3194
3195         if (ctdb->tunable.verify_recovery_lock != 0) {
3196                 /* we should have the reclock - check its not stale */
3197                 ret = check_recovery_lock(ctdb);
3198                 if (ret != 0) {
3199                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3200                         ctdb_set_culprit(rec, ctdb->pnn);
3201                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3202                         goto again;
3203                 }
3204         }
3205
3206         /* get the nodemap for all active remote nodes
3207          */
3208         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3209         if (remote_nodemaps == NULL) {
3210                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3211                 goto again;
3212         }
3213         for(i=0; i<nodemap->num; i++) {
3214                 remote_nodemaps[i] = NULL;
3215         }
3216         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3217                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3218                 goto again;
3219         } 
3220
3221         /* verify that all other nodes have the same nodemap as we have
3222         */
3223         for (j=0; j<nodemap->num; j++) {
3224                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3225                         continue;
3226                 }
3227
3228                 if (remote_nodemaps[j] == NULL) {
3229                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3230                         ctdb_set_culprit(rec, j);
3231
3232                         goto again;
3233                 }
3234
3235                 /* if the nodes disagree on how many nodes there are
3236                    then this is a good reason to try recovery
3237                  */
3238                 if (remote_nodemaps[j]->num != nodemap->num) {
3239                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3240                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3241                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3242                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3243                         goto again;
3244                 }
3245
3246                 /* if the nodes disagree on which nodes exist and are
3247                    active, then that is also a good reason to do recovery
3248                  */
3249                 for (i=0;i<nodemap->num;i++) {
3250                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3251                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3252                                           nodemap->nodes[j].pnn, i, 
3253                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3254                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3255                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3256                                             vnnmap);
3257                                 goto again;
3258                         }
3259                 }
3260
3261                 /* verify the flags are consistent
3262                 */
3263                 for (i=0; i<nodemap->num; i++) {
3264                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3265                                 continue;
3266                         }
3267                         
3268                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3269                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3270                                   nodemap->nodes[j].pnn, 
3271                                   nodemap->nodes[i].pnn, 
3272                                   remote_nodemaps[j]->nodes[i].flags,
3273                                   nodemap->nodes[j].flags));
3274                                 if (i == j) {
3275                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3276                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3277                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3278                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3279                                                     vnnmap);
3280                                         goto again;
3281                                 } else {
3282                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3283                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3284                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3285                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3286                                                     vnnmap);
3287                                         goto again;
3288                                 }
3289                         }
3290                 }
3291         }
3292
3293
3294         /* there better be the same number of lmasters in the vnn map
3295            as there are active nodes or we will have to do a recovery
3296          */
3297         if (vnnmap->size != rec->num_active) {
3298                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3299                           vnnmap->size, rec->num_active));
3300                 ctdb_set_culprit(rec, ctdb->pnn);
3301                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3302                 goto again;
3303         }
3304
3305         /* verify that all active nodes in the nodemap also exist in 
3306            the vnnmap.
3307          */
3308         for (j=0; j<nodemap->num; j++) {
3309                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3310                         continue;
3311                 }
3312                 if (nodemap->nodes[j].pnn == pnn) {
3313                         continue;
3314                 }
3315
3316                 for (i=0; i<vnnmap->size; i++) {
3317                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3318                                 break;
3319                         }
3320                 }
3321                 if (i == vnnmap->size) {
3322                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3323                                   nodemap->nodes[j].pnn));
3324                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3325                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3326                         goto again;
3327                 }
3328         }
3329
3330         
3331         /* verify that all other nodes have the same vnnmap
3332            and are from the same generation
3333          */
3334         for (j=0; j<nodemap->num; j++) {
3335                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3336                         continue;
3337                 }
3338                 if (nodemap->nodes[j].pnn == pnn) {
3339                         continue;
3340                 }
3341
3342                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3343                                           mem_ctx, &remote_vnnmap);
3344                 if (ret != 0) {
3345                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3346                                   nodemap->nodes[j].pnn));
3347                         goto again;
3348                 }
3349
3350                 /* verify the vnnmap generation is the same */
3351                 if (vnnmap->generation != remote_vnnmap->generation) {
3352                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3353                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3354                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3355                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3356                         goto again;
3357                 }
3358
3359                 /* verify the vnnmap size is the same */
3360                 if (vnnmap->size != remote_vnnmap->size) {
3361                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3362                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3363                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3364                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3365                         goto again;
3366                 }
3367
3368                 /* verify the vnnmap is the same */
3369                 for (i=0;i<vnnmap->size;i++) {
3370                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3371                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3372                                           nodemap->nodes[j].pnn));
3373                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3374                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3375                                             vnnmap);
3376                                 goto again;
3377                         }
3378                 }
3379         }
3380
3381         /* we might need to change who has what IP assigned */
3382         if (rec->need_takeover_run) {
3383                 uint32_t culprit = (uint32_t)-1;
3384
3385                 rec->need_takeover_run = false;
3386
3387                 /* update the list of public ips that a node can handle for
3388                    all connected nodes
3389                 */
3390                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3391                 if (ret != 0) {
3392                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3393                                          culprit));
3394                         ctdb_set_culprit(rec, culprit);
3395                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3396                         goto again;
3397                 }
3398
3399                 /* execute the "startrecovery" event script on all nodes */
3400                 ret = run_startrecovery_eventscript(rec, nodemap);
3401                 if (ret!=0) {
3402                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3403                         ctdb_set_culprit(rec, ctdb->pnn);
3404                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3405                         goto again;
3406                 }
3407
3408                 ret = ctdb_takeover_run(ctdb, nodemap);
3409                 if (ret != 0) {
3410                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3411                         ctdb_set_culprit(rec, ctdb->pnn);
3412                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3413                         goto again;
3414                 }
3415
3416                 /* execute the "recovered" event script on all nodes */
3417                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3418 #if 0
3419 // we cant check whether the event completed successfully
3420 // since this script WILL fail if the node is in recovery mode
3421 // and if that race happens, the code here would just cause a second
3422 // cascading recovery.
3423                 if (ret!=0) {
3424                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3425                         ctdb_set_culprit(rec, ctdb->pnn);
3426                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3427                 }
3428 #endif
3429         }
3430
3431
3432         goto again;
3433
3434 }
3435
3436 /*
3437   event handler for when the main ctdbd dies
3438  */
3439 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3440                                  uint16_t flags, void *private_data)
3441 {
3442         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3443         _exit(1);
3444 }
3445
3446 /*
3447   called regularly to verify that the recovery daemon is still running
3448  */
3449 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3450                               struct timeval yt, void *p)
3451 {
3452         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3453
3454         if (kill(ctdb->recoverd_pid, 0) != 0) {
3455                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3456
3457                 ctdb_stop_recoverd(ctdb);
3458                 ctdb_stop_keepalive(ctdb);
3459                 ctdb_stop_monitoring(ctdb);
3460                 ctdb_release_all_ips(ctdb);
3461                 if (ctdb->methods != NULL) {
3462                         ctdb->methods->shutdown(ctdb);
3463                 }
3464                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3465
3466                 exit(10);       
3467         }
3468
3469         event_add_timed(ctdb->ev, ctdb, 
3470                         timeval_current_ofs(30, 0),
3471                         ctdb_check_recd, ctdb);
3472 }
3473
3474 static void recd_sig_child_handler(struct event_context *ev,
3475         struct signal_event *se, int signum, int count,
3476         void *dont_care, 
3477         void *private_data)
3478 {
3479 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3480         int status;
3481         pid_t pid = -1;
3482
3483         while (pid != 0) {
3484                 pid = waitpid(-1, &status, WNOHANG);
3485                 if (pid == -1) {
3486                         if (errno != ECHILD) {
3487                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3488                         }
3489                         return;
3490                 }
3491                 if (pid > 0) {
3492                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3493                 }
3494         }
3495 }
3496
3497 /*
3498   startup the recovery daemon as a child of the main ctdb daemon
3499  */
3500 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3501 {
3502         int fd[2];
3503         struct signal_event *se;
3504
3505         if (pipe(fd) != 0) {
3506                 return -1;
3507         }
3508
3509         ctdb->ctdbd_pid = getpid();
3510
3511         ctdb->recoverd_pid = fork();
3512         if (ctdb->recoverd_pid == -1) {
3513                 return -1;
3514         }
3515         
3516         if (ctdb->recoverd_pid != 0) {
3517                 close(fd[0]);
3518                 event_add_timed(ctdb->ev, ctdb, 
3519                                 timeval_current_ofs(30, 0),
3520                                 ctdb_check_recd, ctdb);
3521                 return 0;
3522         }
3523
3524         close(fd[1]);
3525
3526         srandom(getpid() ^ time(NULL));
3527
3528         if (switch_from_server_to_client(ctdb) != 0) {
3529                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3530                 exit(1);
3531         }
3532
3533         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3534
3535         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3536                      ctdb_recoverd_parent, &fd[0]);     
3537
3538         /* set up a handler to pick up sigchld */
3539         se = event_add_signal(ctdb->ev, ctdb,
3540                                      SIGCHLD, 0,
3541                                      recd_sig_child_handler,
3542                                      ctdb);
3543         if (se == NULL) {
3544                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3545                 exit(1);
3546         }
3547
3548         monitor_cluster(ctdb);
3549
3550         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3551         return -1;
3552 }
3553
3554 /*
3555   shutdown the recovery daemon
3556  */
3557 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3558 {
3559         if (ctdb->recoverd_pid == 0) {
3560                 return;
3561         }
3562
3563         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3564         kill(ctdb->recoverd_pid, SIGTERM);
3565 }