The recent change to the recovery daemon to keep track of and
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1231                                          struct ctdb_recoverd *rec,
1232                                          struct ctdb_node_map *nodemap,
1233                                          uint32_t *culprit)
1234 {
1235         int j;
1236         int ret;
1237
1238         if (ctdb->num_nodes != nodemap->num) {
1239                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1240                                   ctdb->num_nodes, nodemap->num));
1241                 if (culprit) {
1242                         *culprit = ctdb->pnn;
1243                 }
1244                 return -1;
1245         }
1246
1247         for (j=0; j<nodemap->num; j++) {
1248                 /* release any existing data */
1249                 if (ctdb->nodes[j]->known_public_ips) {
1250                         talloc_free(ctdb->nodes[j]->known_public_ips);
1251                         ctdb->nodes[j]->known_public_ips = NULL;
1252                 }
1253                 if (ctdb->nodes[j]->available_public_ips) {
1254                         talloc_free(ctdb->nodes[j]->available_public_ips);
1255                         ctdb->nodes[j]->available_public_ips = NULL;
1256                 }
1257
1258                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1259                         continue;
1260                 }
1261
1262                 /* grab a new shiny list of public ips from the node */
1263                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1264                                         CONTROL_TIMEOUT(),
1265                                         ctdb->nodes[j]->pnn,
1266                                         ctdb->nodes,
1267                                         0,
1268                                         &ctdb->nodes[j]->known_public_ips);
1269                 if (ret != 0) {
1270                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1271                                 ctdb->nodes[j]->pnn));
1272                         if (culprit) {
1273                                 *culprit = ctdb->nodes[j]->pnn;
1274                         }
1275                         return -1;
1276                 }
1277
1278                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1279                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1280                         rec->need_takeover_run = true;
1281                 }
1282
1283                 /* grab a new shiny list of public ips from the node */
1284                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1285                                         CONTROL_TIMEOUT(),
1286                                         ctdb->nodes[j]->pnn,
1287                                         ctdb->nodes,
1288                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1289                                         &ctdb->nodes[j]->available_public_ips);
1290                 if (ret != 0) {
1291                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1292                                 ctdb->nodes[j]->pnn));
1293                         if (culprit) {
1294                                 *culprit = ctdb->nodes[j]->pnn;
1295                         }
1296                         return -1;
1297                 }
1298         }
1299
1300         return 0;
1301 }
1302
1303 /*
1304   we are the recmaster, and recovery is needed - start a recovery run
1305  */
1306 static int do_recovery(struct ctdb_recoverd *rec, 
1307                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1308                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1309 {
1310         struct ctdb_context *ctdb = rec->ctdb;
1311         int i, j, ret;
1312         uint32_t generation;
1313         struct ctdb_dbid_map *dbmap;
1314         TDB_DATA data;
1315         uint32_t *nodes;
1316         struct timeval start_time;
1317         uint32_t culprit = (uint32_t)-1;
1318
1319         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1320
1321         /* if recovery fails, force it again */
1322         rec->need_recovery = true;
1323
1324         for (i=0; i<ctdb->num_nodes; i++) {
1325                 struct ctdb_banning_state *ban_state;
1326
1327                 if (ctdb->nodes[i]->ban_state == NULL) {
1328                         continue;
1329                 }
1330                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1331                 if (ban_state->count < 2*ctdb->num_nodes) {
1332                         continue;
1333                 }
1334                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1335                         ctdb->nodes[i]->pnn, ban_state->count,
1336                         ctdb->tunable.recovery_ban_period));
1337                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1338                 ban_state->count = 0;
1339         }
1340
1341
1342         if (ctdb->tunable.verify_recovery_lock != 0) {
1343                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1344                 start_time = timeval_current();
1345                 if (!ctdb_recovery_lock(ctdb, true)) {
1346                         ctdb_set_culprit(rec, pnn);
1347                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1348                         return -1;
1349                 }
1350                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1351                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1352         }
1353
1354         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1355
1356         /* get a list of all databases */
1357         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1358         if (ret != 0) {
1359                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1360                 return -1;
1361         }
1362
1363         /* we do the db creation before we set the recovery mode, so the freeze happens
1364            on all databases we will be dealing with. */
1365
1366         /* verify that we have all the databases any other node has */
1367         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1368         if (ret != 0) {
1369                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1370                 return -1;
1371         }
1372
1373         /* verify that all other nodes have all our databases */
1374         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1375         if (ret != 0) {
1376                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1377                 return -1;
1378         }
1379         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1380
1381         /* update the database priority for all remote databases */
1382         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1383         if (ret != 0) {
1384                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1385         }
1386         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1387
1388
1389         /* set recovery mode to active on all nodes */
1390         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1391         if (ret != 0) {
1392                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1393                 return -1;
1394         }
1395
1396         /* execute the "startrecovery" event script on all nodes */
1397         ret = run_startrecovery_eventscript(rec, nodemap);
1398         if (ret!=0) {
1399                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1400                 return -1;
1401         }
1402
1403         /*
1404           update all nodes to have the same flags that we have
1405          */
1406         for (i=0;i<nodemap->num;i++) {
1407                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1408                         continue;
1409                 }
1410
1411                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1412                 if (ret != 0) {
1413                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1414                         return -1;
1415                 }
1416         }
1417
1418         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1419
1420         /* pick a new generation number */
1421         generation = new_generation();
1422
1423         /* change the vnnmap on this node to use the new generation 
1424            number but not on any other nodes.
1425            this guarantees that if we abort the recovery prematurely
1426            for some reason (a node stops responding?)
1427            that we can just return immediately and we will reenter
1428            recovery shortly again.
1429            I.e. we deliberately leave the cluster with an inconsistent
1430            generation id to allow us to abort recovery at any stage and
1431            just restart it from scratch.
1432          */
1433         vnnmap->generation = generation;
1434         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1435         if (ret != 0) {
1436                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1437                 return -1;
1438         }
1439
1440         data.dptr = (void *)&generation;
1441         data.dsize = sizeof(uint32_t);
1442
1443         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1444         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1445                                         nodes, 0,
1446                                         CONTROL_TIMEOUT(), false, data,
1447                                         NULL,
1448                                         transaction_start_fail_callback,
1449                                         rec) != 0) {
1450                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1451                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1452                                         nodes, 0,
1453                                         CONTROL_TIMEOUT(), false, tdb_null,
1454                                         NULL,
1455                                         NULL,
1456                                         NULL) != 0) {
1457                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1458                 }
1459                 return -1;
1460         }
1461
1462         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1463
1464         for (i=0;i<dbmap->num;i++) {
1465                 ret = recover_database(rec, mem_ctx,
1466                                        dbmap->dbs[i].dbid,
1467                                        dbmap->dbs[i].persistent,
1468                                        pnn, nodemap, generation);
1469                 if (ret != 0) {
1470                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1471                         return -1;
1472                 }
1473         }
1474
1475         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1476
1477         /* commit all the changes */
1478         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1479                                         nodes, 0,
1480                                         CONTROL_TIMEOUT(), false, data,
1481                                         NULL, NULL,
1482                                         NULL) != 0) {
1483                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1484                 return -1;
1485         }
1486
1487         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1488         
1489
1490         /* update the capabilities for all nodes */
1491         ret = update_capabilities(ctdb, nodemap);
1492         if (ret!=0) {
1493                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1494                 return -1;
1495         }
1496
1497         /* build a new vnn map with all the currently active and
1498            unbanned nodes */
1499         generation = new_generation();
1500         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1501         CTDB_NO_MEMORY(ctdb, vnnmap);
1502         vnnmap->generation = generation;
1503         vnnmap->size = 0;
1504         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1505         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1506         for (i=j=0;i<nodemap->num;i++) {
1507                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1508                         continue;
1509                 }
1510                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1511                         /* this node can not be an lmaster */
1512                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1513                         continue;
1514                 }
1515
1516                 vnnmap->size++;
1517                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1518                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1519                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1520
1521         }
1522         if (vnnmap->size == 0) {
1523                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1524                 vnnmap->size++;
1525                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1526                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1527                 vnnmap->map[0] = pnn;
1528         }       
1529
1530         /* update to the new vnnmap on all nodes */
1531         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1532         if (ret != 0) {
1533                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1534                 return -1;
1535         }
1536
1537         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1538
1539         /* update recmaster to point to us for all nodes */
1540         ret = set_recovery_master(ctdb, nodemap, pnn);
1541         if (ret!=0) {
1542                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1543                 return -1;
1544         }
1545
1546         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1547
1548         /*
1549           update all nodes to have the same flags that we have
1550          */
1551         for (i=0;i<nodemap->num;i++) {
1552                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1553                         continue;
1554                 }
1555
1556                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1557                 if (ret != 0) {
1558                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1559                         return -1;
1560                 }
1561         }
1562
1563         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1564
1565         /* disable recovery mode */
1566         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1567         if (ret != 0) {
1568                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1569                 return -1;
1570         }
1571
1572         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1573
1574         /*
1575           tell nodes to takeover their public IPs
1576          */
1577         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1578         if (ret != 0) {
1579                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1580                                  culprit));
1581                 return -1;
1582         }
1583         rec->need_takeover_run = false;
1584         ret = ctdb_takeover_run(ctdb, nodemap);
1585         if (ret != 0) {
1586                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1587                 return -1;
1588         }
1589         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1590
1591         /* execute the "recovered" event script on all nodes */
1592         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1593         if (ret!=0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1595                 return -1;
1596         }
1597
1598         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1599
1600         /* send a message to all clients telling them that the cluster 
1601            has been reconfigured */
1602         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1603
1604         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1605
1606         rec->need_recovery = false;
1607
1608         /* we managed to complete a full recovery, make sure to forgive
1609            any past sins by the nodes that could now participate in the
1610            recovery.
1611         */
1612         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1613         for (i=0;i<nodemap->num;i++) {
1614                 struct ctdb_banning_state *ban_state;
1615
1616                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1617                         continue;
1618                 }
1619
1620                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1621                 if (ban_state == NULL) {
1622                         continue;
1623                 }
1624
1625                 ban_state->count = 0;
1626         }
1627
1628
1629         /* We just finished a recovery successfully. 
1630            We now wait for rerecovery_timeout before we allow 
1631            another recovery to take place.
1632         */
1633         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1634         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1635         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1636
1637         return 0;
1638 }
1639
1640
1641 /*
1642   elections are won by first checking the number of connected nodes, then
1643   the priority time, then the pnn
1644  */
1645 struct election_message {
1646         uint32_t num_connected;
1647         struct timeval priority_time;
1648         uint32_t pnn;
1649         uint32_t node_flags;
1650 };
1651
1652 /*
1653   form this nodes election data
1654  */
1655 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1656 {
1657         int ret, i;
1658         struct ctdb_node_map *nodemap;
1659         struct ctdb_context *ctdb = rec->ctdb;
1660
1661         ZERO_STRUCTP(em);
1662
1663         em->pnn = rec->ctdb->pnn;
1664         em->priority_time = rec->priority_time;
1665
1666         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1667         if (ret != 0) {
1668                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1669                 return;
1670         }
1671
1672         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1673         em->node_flags = rec->node_flags;
1674
1675         for (i=0;i<nodemap->num;i++) {
1676                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1677                         em->num_connected++;
1678                 }
1679         }
1680
1681         /* we shouldnt try to win this election if we cant be a recmaster */
1682         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1683                 em->num_connected = 0;
1684                 em->priority_time = timeval_current();
1685         }
1686
1687         talloc_free(nodemap);
1688 }
1689
1690 /*
1691   see if the given election data wins
1692  */
1693 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1694 {
1695         struct election_message myem;
1696         int cmp = 0;
1697
1698         ctdb_election_data(rec, &myem);
1699
1700         /* we cant win if we dont have the recmaster capability */
1701         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1702                 return false;
1703         }
1704
1705         /* we cant win if we are banned */
1706         if (rec->node_flags & NODE_FLAGS_BANNED) {
1707                 return false;
1708         }       
1709
1710         /* we cant win if we are stopped */
1711         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1712                 return false;
1713         }       
1714
1715         /* we will automatically win if the other node is banned */
1716         if (em->node_flags & NODE_FLAGS_BANNED) {
1717                 return true;
1718         }
1719
1720         /* we will automatically win if the other node is banned */
1721         if (em->node_flags & NODE_FLAGS_STOPPED) {
1722                 return true;
1723         }
1724
1725         /* try to use the most connected node */
1726         if (cmp == 0) {
1727                 cmp = (int)myem.num_connected - (int)em->num_connected;
1728         }
1729
1730         /* then the longest running node */
1731         if (cmp == 0) {
1732                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1733         }
1734
1735         if (cmp == 0) {
1736                 cmp = (int)myem.pnn - (int)em->pnn;
1737         }
1738
1739         return cmp > 0;
1740 }
1741
1742 /*
1743   send out an election request
1744  */
1745 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1746 {
1747         int ret;
1748         TDB_DATA election_data;
1749         struct election_message emsg;
1750         uint64_t srvid;
1751         struct ctdb_context *ctdb = rec->ctdb;
1752
1753         srvid = CTDB_SRVID_RECOVERY;
1754
1755         ctdb_election_data(rec, &emsg);
1756
1757         election_data.dsize = sizeof(struct election_message);
1758         election_data.dptr  = (unsigned char *)&emsg;
1759
1760
1761         /* send an election message to all active nodes */
1762         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1763         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1764
1765
1766         /* A new node that is already frozen has entered the cluster.
1767            The existing nodes are not frozen and dont need to be frozen
1768            until the election has ended and we start the actual recovery
1769         */
1770         if (update_recmaster == true) {
1771                 /* first we assume we will win the election and set 
1772                    recoverymaster to be ourself on the current node
1773                  */
1774                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1775                 if (ret != 0) {
1776                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1777                         return -1;
1778                 }
1779         }
1780
1781
1782         return 0;
1783 }
1784
1785 /*
1786   this function will unban all nodes in the cluster
1787 */
1788 static void unban_all_nodes(struct ctdb_context *ctdb)
1789 {
1790         int ret, i;
1791         struct ctdb_node_map *nodemap;
1792         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1793         
1794         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1795         if (ret != 0) {
1796                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1797                 return;
1798         }
1799
1800         for (i=0;i<nodemap->num;i++) {
1801                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1802                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1803                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1804                 }
1805         }
1806
1807         talloc_free(tmp_ctx);
1808 }
1809
1810
1811 /*
1812   we think we are winning the election - send a broadcast election request
1813  */
1814 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1815 {
1816         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1817         int ret;
1818
1819         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1820         if (ret != 0) {
1821                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1822         }
1823
1824         talloc_free(rec->send_election_te);
1825         rec->send_election_te = NULL;
1826 }
1827
1828 /*
1829   handler for memory dumps
1830 */
1831 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1832                              TDB_DATA data, void *private_data)
1833 {
1834         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1835         TDB_DATA *dump;
1836         int ret;
1837         struct rd_memdump_reply *rd;
1838
1839         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1841                 talloc_free(tmp_ctx);
1842                 return;
1843         }
1844         rd = (struct rd_memdump_reply *)data.dptr;
1845
1846         dump = talloc_zero(tmp_ctx, TDB_DATA);
1847         if (dump == NULL) {
1848                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1849                 talloc_free(tmp_ctx);
1850                 return;
1851         }
1852         ret = ctdb_dump_memory(ctdb, dump);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1855                 talloc_free(tmp_ctx);
1856                 return;
1857         }
1858
1859 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1860
1861         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1862         if (ret != 0) {
1863                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1864                 talloc_free(tmp_ctx);
1865                 return;
1866         }
1867
1868         talloc_free(tmp_ctx);
1869 }
1870
1871 /*
1872   handler for reload_nodes
1873 */
1874 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1875                              TDB_DATA data, void *private_data)
1876 {
1877         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1878
1879         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1880
1881         reload_nodes_file(rec->ctdb);
1882 }
1883
1884
1885 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1886                               struct timeval yt, void *p)
1887 {
1888         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1889
1890         talloc_free(rec->ip_check_disable_ctx);
1891         rec->ip_check_disable_ctx = NULL;
1892 }
1893
1894
1895 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1896                              TDB_DATA data, void *private_data)
1897 {
1898         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1899         struct ctdb_public_ip *ip;
1900
1901         if (rec->recmaster != rec->ctdb->pnn) {
1902                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
1903                 return;
1904         }
1905
1906         if (data.dsize != sizeof(struct ctdb_public_ip)) {
1907                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
1908                 return;
1909         }
1910
1911         ip = (struct ctdb_public_ip *)data.dptr;
1912
1913         update_ip_assignment_tree(rec->ctdb, ip);
1914 }
1915
1916
1917 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1918                              TDB_DATA data, void *private_data)
1919 {
1920         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1921         uint32_t timeout;
1922
1923         if (rec->ip_check_disable_ctx != NULL) {
1924                 talloc_free(rec->ip_check_disable_ctx);
1925                 rec->ip_check_disable_ctx = NULL;
1926         }
1927
1928         if (data.dsize != sizeof(uint32_t)) {
1929                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1930                                  "expexting %lu\n", (long unsigned)data.dsize,
1931                                  (long unsigned)sizeof(uint32_t)));
1932                 return;
1933         }
1934         if (data.dptr == NULL) {
1935                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1936                 return;
1937         }
1938
1939         timeout = *((uint32_t *)data.dptr);
1940         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1941
1942         rec->ip_check_disable_ctx = talloc_new(rec);
1943         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1944
1945         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1946 }
1947
1948
1949 /*
1950   handler for ip reallocate, just add it to the list of callers and 
1951   handle this later in the monitor_cluster loop so we do not recurse
1952   with other callers to takeover_run()
1953 */
1954 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1955                              TDB_DATA data, void *private_data)
1956 {
1957         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1958         struct ip_reallocate_list *caller;
1959
1960         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1961                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1962                 return;
1963         }
1964
1965         if (rec->ip_reallocate_ctx == NULL) {
1966                 rec->ip_reallocate_ctx = talloc_new(rec);
1967                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1968         }
1969
1970         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1971         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1972
1973         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1974         caller->next = rec->reallocate_callers;
1975         rec->reallocate_callers = caller;
1976
1977         return;
1978 }
1979
1980 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1981 {
1982         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1983         TDB_DATA result;
1984         int32_t ret;
1985         struct ip_reallocate_list *callers;
1986         uint32_t culprit;
1987
1988         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1989
1990         /* update the list of public ips that a node can handle for
1991            all connected nodes
1992         */
1993         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
1994         if (ret != 0) {
1995                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1996                                  culprit));
1997                 rec->need_takeover_run = true;
1998         }
1999         if (ret == 0) {
2000                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2001                 if (ret != 0) {
2002                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2003                                          culprit));
2004                         rec->need_takeover_run = true;
2005                 }
2006         }
2007
2008         result.dsize = sizeof(int32_t);
2009         result.dptr  = (uint8_t *)&ret;
2010
2011         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2012
2013                 /* Someone that sent srvid==0 does not want a reply */
2014                 if (callers->rd->srvid == 0) {
2015                         continue;
2016                 }
2017                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2018                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2019                                   (unsigned long long)callers->rd->srvid));
2020                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2021                 if (ret != 0) {
2022                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2023                                          "message to %u:%llu\n",
2024                                          (unsigned)callers->rd->pnn,
2025                                          (unsigned long long)callers->rd->srvid));
2026                 }
2027         }
2028
2029         talloc_free(tmp_ctx);
2030         talloc_free(rec->ip_reallocate_ctx);
2031         rec->ip_reallocate_ctx = NULL;
2032         rec->reallocate_callers = NULL;
2033         
2034 }
2035
2036
2037 /*
2038   handler for recovery master elections
2039 */
2040 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2041                              TDB_DATA data, void *private_data)
2042 {
2043         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2044         int ret;
2045         struct election_message *em = (struct election_message *)data.dptr;
2046         TALLOC_CTX *mem_ctx;
2047
2048         /* we got an election packet - update the timeout for the election */
2049         talloc_free(rec->election_timeout);
2050         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2051                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2052                                                 ctdb_election_timeout, rec);
2053
2054         mem_ctx = talloc_new(ctdb);
2055
2056         /* someone called an election. check their election data
2057            and if we disagree and we would rather be the elected node, 
2058            send a new election message to all other nodes
2059          */
2060         if (ctdb_election_win(rec, em)) {
2061                 if (!rec->send_election_te) {
2062                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2063                                                                 timeval_current_ofs(0, 500000),
2064                                                                 election_send_request, rec);
2065                 }
2066                 talloc_free(mem_ctx);
2067                 /*unban_all_nodes(ctdb);*/
2068                 return;
2069         }
2070         
2071         /* we didn't win */
2072         talloc_free(rec->send_election_te);
2073         rec->send_election_te = NULL;
2074
2075         if (ctdb->tunable.verify_recovery_lock != 0) {
2076                 /* release the recmaster lock */
2077                 if (em->pnn != ctdb->pnn &&
2078                     ctdb->recovery_lock_fd != -1) {
2079                         close(ctdb->recovery_lock_fd);
2080                         ctdb->recovery_lock_fd = -1;
2081                         unban_all_nodes(ctdb);
2082                 }
2083         }
2084
2085         /* ok, let that guy become recmaster then */
2086         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2087         if (ret != 0) {
2088                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2089                 talloc_free(mem_ctx);
2090                 return;
2091         }
2092
2093         talloc_free(mem_ctx);
2094         return;
2095 }
2096
2097
2098 /*
2099   force the start of the election process
2100  */
2101 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2102                            struct ctdb_node_map *nodemap)
2103 {
2104         int ret;
2105         struct ctdb_context *ctdb = rec->ctdb;
2106
2107         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2108
2109         /* set all nodes to recovery mode to stop all internode traffic */
2110         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2111         if (ret != 0) {
2112                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2113                 return;
2114         }
2115
2116         talloc_free(rec->election_timeout);
2117         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2118                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2119                                                 ctdb_election_timeout, rec);
2120
2121         ret = send_election_request(rec, pnn, true);
2122         if (ret!=0) {
2123                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2124                 return;
2125         }
2126
2127         /* wait for a few seconds to collect all responses */
2128         ctdb_wait_election(rec);
2129 }
2130
2131
2132
2133 /*
2134   handler for when a node changes its flags
2135 */
2136 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2137                             TDB_DATA data, void *private_data)
2138 {
2139         int ret;
2140         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2141         struct ctdb_node_map *nodemap=NULL;
2142         TALLOC_CTX *tmp_ctx;
2143         uint32_t changed_flags;
2144         int i;
2145         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2146         int disabled_flag_changed;
2147
2148         if (data.dsize != sizeof(*c)) {
2149                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2150                 return;
2151         }
2152
2153         tmp_ctx = talloc_new(ctdb);
2154         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2155
2156         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2157         if (ret != 0) {
2158                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2159                 talloc_free(tmp_ctx);
2160                 return;         
2161         }
2162
2163
2164         for (i=0;i<nodemap->num;i++) {
2165                 if (nodemap->nodes[i].pnn == c->pnn) break;
2166         }
2167
2168         if (i == nodemap->num) {
2169                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2170                 talloc_free(tmp_ctx);
2171                 return;
2172         }
2173
2174         changed_flags = c->old_flags ^ c->new_flags;
2175
2176         if (nodemap->nodes[i].flags != c->new_flags) {
2177                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2178         }
2179
2180         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2181
2182         nodemap->nodes[i].flags = c->new_flags;
2183
2184         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2185                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2186
2187         if (ret == 0) {
2188                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2189                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2190         }
2191         
2192         if (ret == 0 &&
2193             ctdb->recovery_master == ctdb->pnn &&
2194             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2195                 /* Only do the takeover run if the perm disabled or unhealthy
2196                    flags changed since these will cause an ip failover but not
2197                    a recovery.
2198                    If the node became disconnected or banned this will also
2199                    lead to an ip address failover but that is handled 
2200                    during recovery
2201                 */
2202                 if (disabled_flag_changed) {
2203                         rec->need_takeover_run = true;
2204                 }
2205         }
2206
2207         talloc_free(tmp_ctx);
2208 }
2209
2210 /*
2211   handler for when we need to push out flag changes ot all other nodes
2212 */
2213 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2214                             TDB_DATA data, void *private_data)
2215 {
2216         int ret;
2217         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2218         struct ctdb_node_map *nodemap=NULL;
2219         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2220         uint32_t recmaster;
2221         uint32_t *nodes;
2222
2223         /* find the recovery master */
2224         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2225         if (ret != 0) {
2226                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2227                 talloc_free(tmp_ctx);
2228                 return;
2229         }
2230
2231         /* read the node flags from the recmaster */
2232         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2233         if (ret != 0) {
2234                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2235                 talloc_free(tmp_ctx);
2236                 return;
2237         }
2238         if (c->pnn >= nodemap->num) {
2239                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2240                 talloc_free(tmp_ctx);
2241                 return;
2242         }
2243
2244         /* send the flags update to all connected nodes */
2245         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2246
2247         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2248                                       nodes, 0, CONTROL_TIMEOUT(),
2249                                       false, data,
2250                                       NULL, NULL,
2251                                       NULL) != 0) {
2252                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2253
2254                 talloc_free(tmp_ctx);
2255                 return;
2256         }
2257
2258         talloc_free(tmp_ctx);
2259 }
2260
2261
2262 struct verify_recmode_normal_data {
2263         uint32_t count;
2264         enum monitor_result status;
2265 };
2266
2267 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2268 {
2269         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2270
2271
2272         /* one more node has responded with recmode data*/
2273         rmdata->count--;
2274
2275         /* if we failed to get the recmode, then return an error and let
2276            the main loop try again.
2277         */
2278         if (state->state != CTDB_CONTROL_DONE) {
2279                 if (rmdata->status == MONITOR_OK) {
2280                         rmdata->status = MONITOR_FAILED;
2281                 }
2282                 return;
2283         }
2284
2285         /* if we got a response, then the recmode will be stored in the
2286            status field
2287         */
2288         if (state->status != CTDB_RECOVERY_NORMAL) {
2289                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2290                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2291         }
2292
2293         return;
2294 }
2295
2296
2297 /* verify that all nodes are in normal recovery mode */
2298 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2299 {
2300         struct verify_recmode_normal_data *rmdata;
2301         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2302         struct ctdb_client_control_state *state;
2303         enum monitor_result status;
2304         int j;
2305         
2306         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2307         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2308         rmdata->count  = 0;
2309         rmdata->status = MONITOR_OK;
2310
2311         /* loop over all active nodes and send an async getrecmode call to 
2312            them*/
2313         for (j=0; j<nodemap->num; j++) {
2314                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2315                         continue;
2316                 }
2317                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2318                                         CONTROL_TIMEOUT(), 
2319                                         nodemap->nodes[j].pnn);
2320                 if (state == NULL) {
2321                         /* we failed to send the control, treat this as 
2322                            an error and try again next iteration
2323                         */                      
2324                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2325                         talloc_free(mem_ctx);
2326                         return MONITOR_FAILED;
2327                 }
2328
2329                 /* set up the callback functions */
2330                 state->async.fn = verify_recmode_normal_callback;
2331                 state->async.private_data = rmdata;
2332
2333                 /* one more control to wait for to complete */
2334                 rmdata->count++;
2335         }
2336
2337
2338         /* now wait for up to the maximum number of seconds allowed
2339            or until all nodes we expect a response from has replied
2340         */
2341         while (rmdata->count > 0) {
2342                 event_loop_once(ctdb->ev);
2343         }
2344
2345         status = rmdata->status;
2346         talloc_free(mem_ctx);
2347         return status;
2348 }
2349
2350
2351 struct verify_recmaster_data {
2352         struct ctdb_recoverd *rec;
2353         uint32_t count;
2354         uint32_t pnn;
2355         enum monitor_result status;
2356 };
2357
2358 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2359 {
2360         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2361
2362
2363         /* one more node has responded with recmaster data*/
2364         rmdata->count--;
2365
2366         /* if we failed to get the recmaster, then return an error and let
2367            the main loop try again.
2368         */
2369         if (state->state != CTDB_CONTROL_DONE) {
2370                 if (rmdata->status == MONITOR_OK) {
2371                         rmdata->status = MONITOR_FAILED;
2372                 }
2373                 return;
2374         }
2375
2376         /* if we got a response, then the recmaster will be stored in the
2377            status field
2378         */
2379         if (state->status != rmdata->pnn) {
2380                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2381                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2382                 rmdata->status = MONITOR_ELECTION_NEEDED;
2383         }
2384
2385         return;
2386 }
2387
2388
2389 /* verify that all nodes agree that we are the recmaster */
2390 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2391 {
2392         struct ctdb_context *ctdb = rec->ctdb;
2393         struct verify_recmaster_data *rmdata;
2394         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2395         struct ctdb_client_control_state *state;
2396         enum monitor_result status;
2397         int j;
2398         
2399         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2400         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2401         rmdata->rec    = rec;
2402         rmdata->count  = 0;
2403         rmdata->pnn    = pnn;
2404         rmdata->status = MONITOR_OK;
2405
2406         /* loop over all active nodes and send an async getrecmaster call to 
2407            them*/
2408         for (j=0; j<nodemap->num; j++) {
2409                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2410                         continue;
2411                 }
2412                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2413                                         CONTROL_TIMEOUT(),
2414                                         nodemap->nodes[j].pnn);
2415                 if (state == NULL) {
2416                         /* we failed to send the control, treat this as 
2417                            an error and try again next iteration
2418                         */                      
2419                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2420                         talloc_free(mem_ctx);
2421                         return MONITOR_FAILED;
2422                 }
2423
2424                 /* set up the callback functions */
2425                 state->async.fn = verify_recmaster_callback;
2426                 state->async.private_data = rmdata;
2427
2428                 /* one more control to wait for to complete */
2429                 rmdata->count++;
2430         }
2431
2432
2433         /* now wait for up to the maximum number of seconds allowed
2434            or until all nodes we expect a response from has replied
2435         */
2436         while (rmdata->count > 0) {
2437                 event_loop_once(ctdb->ev);
2438         }
2439
2440         status = rmdata->status;
2441         talloc_free(mem_ctx);
2442         return status;
2443 }
2444
2445
2446 /* called to check that the local allocation of public ip addresses is ok.
2447 */
2448 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2449 {
2450         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2451         struct ctdb_control_get_ifaces *ifaces = NULL;
2452         struct ctdb_all_public_ips *ips = NULL;
2453         struct ctdb_uptime *uptime1 = NULL;
2454         struct ctdb_uptime *uptime2 = NULL;
2455         int ret, j;
2456         bool need_iface_check = false;
2457         bool need_takeover_run = false;
2458
2459         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2460                                 CTDB_CURRENT_NODE, &uptime1);
2461         if (ret != 0) {
2462                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2463                 talloc_free(mem_ctx);
2464                 return -1;
2465         }
2466
2467
2468         /* read the interfaces from the local node */
2469         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2470         if (ret != 0) {
2471                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2472                 talloc_free(mem_ctx);
2473                 return -1;
2474         }
2475
2476         if (!rec->ifaces) {
2477                 need_iface_check = true;
2478         } else if (rec->ifaces->num != ifaces->num) {
2479                 need_iface_check = true;
2480         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2481                 need_iface_check = true;
2482         }
2483
2484         if (need_iface_check) {
2485                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2486                                      "local node %u - force takeover run\n",
2487                                      pnn));
2488                 need_takeover_run = true;
2489         }
2490
2491         /* read the ip allocation from the local node */
2492         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2493         if (ret != 0) {
2494                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2495                 talloc_free(mem_ctx);
2496                 return -1;
2497         }
2498
2499         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2500                                 CTDB_CURRENT_NODE, &uptime2);
2501         if (ret != 0) {
2502                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2503                 talloc_free(mem_ctx);
2504                 return -1;
2505         }
2506
2507         /* skip the check if the startrecovery time has changed */
2508         if (timeval_compare(&uptime1->last_recovery_started,
2509                             &uptime2->last_recovery_started) != 0) {
2510                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2511                 talloc_free(mem_ctx);
2512                 return 0;
2513         }
2514
2515         /* skip the check if the endrecovery time has changed */
2516         if (timeval_compare(&uptime1->last_recovery_finished,
2517                             &uptime2->last_recovery_finished) != 0) {
2518                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2519                 talloc_free(mem_ctx);
2520                 return 0;
2521         }
2522
2523         /* skip the check if we have started but not finished recovery */
2524         if (timeval_compare(&uptime1->last_recovery_finished,
2525                             &uptime1->last_recovery_started) != 1) {
2526                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2527                 talloc_free(mem_ctx);
2528
2529                 return 0;
2530         }
2531
2532         talloc_free(rec->ifaces);
2533         rec->ifaces = talloc_steal(rec, ifaces);
2534
2535         /* verify that we have the ip addresses we should have
2536            and we dont have ones we shouldnt have.
2537            if we find an inconsistency we set recmode to
2538            active on the local node and wait for the recmaster
2539            to do a full blown recovery
2540         */
2541         for (j=0; j<ips->num; j++) {
2542                 if (ips->ips[j].pnn == pnn) {
2543                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2544                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2545                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2546                                 need_takeover_run = true;
2547                         }
2548                 } else {
2549                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2550                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2551                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2552                                 need_takeover_run = true;
2553                         }
2554                 }
2555         }
2556
2557         if (need_takeover_run) {
2558                 struct takeover_run_reply rd;
2559                 TDB_DATA data;
2560
2561                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2562
2563                 rd.pnn = ctdb->pnn;
2564                 rd.srvid = 0;
2565                 data.dptr = (uint8_t *)&rd;
2566                 data.dsize = sizeof(rd);
2567
2568                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2569                 if (ret != 0) {
2570                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2571                 }
2572         }
2573         talloc_free(mem_ctx);
2574         return 0;
2575 }
2576
2577
2578 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2579 {
2580         struct ctdb_node_map **remote_nodemaps = callback_data;
2581
2582         if (node_pnn >= ctdb->num_nodes) {
2583                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2584                 return;
2585         }
2586
2587         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2588
2589 }
2590
2591 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2592         struct ctdb_node_map *nodemap,
2593         struct ctdb_node_map **remote_nodemaps)
2594 {
2595         uint32_t *nodes;
2596
2597         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2598         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2599                                         nodes, 0,
2600                                         CONTROL_TIMEOUT(), false, tdb_null,
2601                                         async_getnodemap_callback,
2602                                         NULL,
2603                                         remote_nodemaps) != 0) {
2604                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2605
2606                 return -1;
2607         }
2608
2609         return 0;
2610 }
2611
2612 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2613 struct ctdb_check_reclock_state {
2614         struct ctdb_context *ctdb;
2615         struct timeval start_time;
2616         int fd[2];
2617         pid_t child;
2618         struct timed_event *te;
2619         struct fd_event *fde;
2620         enum reclock_child_status status;
2621 };
2622
2623 /* when we free the reclock state we must kill any child process.
2624 */
2625 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2626 {
2627         struct ctdb_context *ctdb = state->ctdb;
2628
2629         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2630
2631         if (state->fd[0] != -1) {
2632                 close(state->fd[0]);
2633                 state->fd[0] = -1;
2634         }
2635         if (state->fd[1] != -1) {
2636                 close(state->fd[1]);
2637                 state->fd[1] = -1;
2638         }
2639         kill(state->child, SIGKILL);
2640         return 0;
2641 }
2642
2643 /*
2644   called if our check_reclock child times out. this would happen if
2645   i/o to the reclock file blocks.
2646  */
2647 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2648                                          struct timeval t, void *private_data)
2649 {
2650         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2651                                            struct ctdb_check_reclock_state);
2652
2653         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2654         state->status = RECLOCK_TIMEOUT;
2655 }
2656
2657 /* this is called when the child process has completed checking the reclock
2658    file and has written data back to us through the pipe.
2659 */
2660 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2661                              uint16_t flags, void *private_data)
2662 {
2663         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2664                                              struct ctdb_check_reclock_state);
2665         char c = 0;
2666         int ret;
2667
2668         /* we got a response from our child process so we can abort the
2669            timeout.
2670         */
2671         talloc_free(state->te);
2672         state->te = NULL;
2673
2674         ret = read(state->fd[0], &c, 1);
2675         if (ret != 1 || c != RECLOCK_OK) {
2676                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2677                 state->status = RECLOCK_FAILED;
2678
2679                 return;
2680         }
2681
2682         state->status = RECLOCK_OK;
2683         return;
2684 }
2685
2686 static int check_recovery_lock(struct ctdb_context *ctdb)
2687 {
2688         int ret;
2689         struct ctdb_check_reclock_state *state;
2690         pid_t parent = getpid();
2691
2692         if (ctdb->recovery_lock_fd == -1) {
2693                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2694                 return -1;
2695         }
2696
2697         state = talloc(ctdb, struct ctdb_check_reclock_state);
2698         CTDB_NO_MEMORY(ctdb, state);
2699
2700         state->ctdb = ctdb;
2701         state->start_time = timeval_current();
2702         state->status = RECLOCK_CHECKING;
2703         state->fd[0] = -1;
2704         state->fd[1] = -1;
2705
2706         ret = pipe(state->fd);
2707         if (ret != 0) {
2708                 talloc_free(state);
2709                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2710                 return -1;
2711         }
2712
2713         state->child = fork();
2714         if (state->child == (pid_t)-1) {
2715                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2716                 close(state->fd[0]);
2717                 state->fd[0] = -1;
2718                 close(state->fd[1]);
2719                 state->fd[1] = -1;
2720                 talloc_free(state);
2721                 return -1;
2722         }
2723
2724         if (state->child == 0) {
2725                 char cc = RECLOCK_OK;
2726                 close(state->fd[0]);
2727                 state->fd[0] = -1;
2728
2729                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2730                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2731                         cc = RECLOCK_FAILED;
2732                 }
2733
2734                 write(state->fd[1], &cc, 1);
2735                 /* make sure we die when our parent dies */
2736                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2737                         sleep(5);
2738                         write(state->fd[1], &cc, 1);
2739                 }
2740                 _exit(0);
2741         }
2742         close(state->fd[1]);
2743         state->fd[1] = -1;
2744         set_close_on_exec(state->fd[0]);
2745
2746         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2747
2748         talloc_set_destructor(state, check_reclock_destructor);
2749
2750         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2751                                     ctdb_check_reclock_timeout, state);
2752         if (state->te == NULL) {
2753                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2754                 talloc_free(state);
2755                 return -1;
2756         }
2757
2758         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2759                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2760                                 reclock_child_handler,
2761                                 (void *)state);
2762
2763         if (state->fde == NULL) {
2764                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2765                 talloc_free(state);
2766                 return -1;
2767         }
2768
2769         while (state->status == RECLOCK_CHECKING) {
2770                 event_loop_once(ctdb->ev);
2771         }
2772
2773         if (state->status == RECLOCK_FAILED) {
2774                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2775                 close(ctdb->recovery_lock_fd);
2776                 ctdb->recovery_lock_fd = -1;
2777                 talloc_free(state);
2778                 return -1;
2779         }
2780
2781         talloc_free(state);
2782         return 0;
2783 }
2784
2785 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2786 {
2787         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2788         const char *reclockfile;
2789
2790         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2791                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2792                 talloc_free(tmp_ctx);
2793                 return -1;      
2794         }
2795
2796         if (reclockfile == NULL) {
2797                 if (ctdb->recovery_lock_file != NULL) {
2798                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2799                         talloc_free(ctdb->recovery_lock_file);
2800                         ctdb->recovery_lock_file = NULL;
2801                         if (ctdb->recovery_lock_fd != -1) {
2802                                 close(ctdb->recovery_lock_fd);
2803                                 ctdb->recovery_lock_fd = -1;
2804                         }
2805                 }
2806                 ctdb->tunable.verify_recovery_lock = 0;
2807                 talloc_free(tmp_ctx);
2808                 return 0;
2809         }
2810
2811         if (ctdb->recovery_lock_file == NULL) {
2812                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2813                 if (ctdb->recovery_lock_fd != -1) {
2814                         close(ctdb->recovery_lock_fd);
2815                         ctdb->recovery_lock_fd = -1;
2816                 }
2817                 talloc_free(tmp_ctx);
2818                 return 0;
2819         }
2820
2821
2822         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2823                 talloc_free(tmp_ctx);
2824                 return 0;
2825         }
2826
2827         talloc_free(ctdb->recovery_lock_file);
2828         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2829         ctdb->tunable.verify_recovery_lock = 0;
2830         if (ctdb->recovery_lock_fd != -1) {
2831                 close(ctdb->recovery_lock_fd);
2832                 ctdb->recovery_lock_fd = -1;
2833         }
2834
2835         talloc_free(tmp_ctx);
2836         return 0;
2837 }
2838                 
2839 /*
2840   the main monitoring loop
2841  */
2842 static void monitor_cluster(struct ctdb_context *ctdb)
2843 {
2844         uint32_t pnn;
2845         TALLOC_CTX *mem_ctx=NULL;
2846         struct ctdb_node_map *nodemap=NULL;
2847         struct ctdb_node_map *recmaster_nodemap=NULL;
2848         struct ctdb_node_map **remote_nodemaps=NULL;
2849         struct ctdb_vnn_map *vnnmap=NULL;
2850         struct ctdb_vnn_map *remote_vnnmap=NULL;
2851         int32_t debug_level;
2852         int i, j, ret;
2853         struct ctdb_recoverd *rec;
2854
2855         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2856
2857         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2858         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2859
2860         rec->ctdb = ctdb;
2861
2862         rec->priority_time = timeval_current();
2863
2864         /* register a message port for sending memory dumps */
2865         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2866
2867         /* register a message port for recovery elections */
2868         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2869
2870         /* when nodes are disabled/enabled */
2871         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2872
2873         /* when we are asked to puch out a flag change */
2874         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2875
2876         /* register a message port for vacuum fetch */
2877         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2878
2879         /* register a message port for reloadnodes  */
2880         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2881
2882         /* register a message port for performing a takeover run */
2883         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2884
2885         /* register a message port for disabling the ip check for a short while */
2886         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2887
2888         /* register a message port for updating the recovery daemons node assignment for an ip */
2889         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
2890
2891 again:
2892         if (mem_ctx) {
2893                 talloc_free(mem_ctx);
2894                 mem_ctx = NULL;
2895         }
2896         mem_ctx = talloc_new(ctdb);
2897         if (!mem_ctx) {
2898                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2899                 exit(-1);
2900         }
2901
2902         /* we only check for recovery once every second */
2903         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2904
2905         /* verify that the main daemon is still running */
2906         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2907                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2908                 exit(-1);
2909         }
2910
2911         /* ping the local daemon to tell it we are alive */
2912         ctdb_ctrl_recd_ping(ctdb);
2913
2914         if (rec->election_timeout) {
2915                 /* an election is in progress */
2916                 goto again;
2917         }
2918
2919         /* read the debug level from the parent and update locally */
2920         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2921         if (ret !=0) {
2922                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2923                 goto again;
2924         }
2925         LogLevel = debug_level;
2926
2927
2928         /* We must check if we need to ban a node here but we want to do this
2929            as early as possible so we dont wait until we have pulled the node
2930            map from the local node. thats why we have the hardcoded value 20
2931         */
2932         for (i=0; i<ctdb->num_nodes; i++) {
2933                 struct ctdb_banning_state *ban_state;
2934
2935                 if (ctdb->nodes[i]->ban_state == NULL) {
2936                         continue;
2937                 }
2938                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2939                 if (ban_state->count < 20) {
2940                         continue;
2941                 }
2942                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2943                         ctdb->nodes[i]->pnn, ban_state->count,
2944                         ctdb->tunable.recovery_ban_period));
2945                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2946                 ban_state->count = 0;
2947         }
2948
2949         /* get relevant tunables */
2950         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2951         if (ret != 0) {
2952                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2953                 goto again;
2954         }
2955
2956         /* get the current recovery lock file from the server */
2957         if (update_recovery_lock_file(ctdb) != 0) {
2958                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2959                 goto again;
2960         }
2961
2962         /* Make sure that if recovery lock verification becomes disabled when
2963            we close the file
2964         */
2965         if (ctdb->tunable.verify_recovery_lock == 0) {
2966                 if (ctdb->recovery_lock_fd != -1) {
2967                         close(ctdb->recovery_lock_fd);
2968                         ctdb->recovery_lock_fd = -1;
2969                 }
2970         }
2971
2972         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2973         if (pnn == (uint32_t)-1) {
2974                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2975                 goto again;
2976         }
2977
2978         /* get the vnnmap */
2979         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2980         if (ret != 0) {
2981                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2982                 goto again;
2983         }
2984
2985
2986         /* get number of nodes */
2987         if (rec->nodemap) {
2988                 talloc_free(rec->nodemap);
2989                 rec->nodemap = NULL;
2990                 nodemap=NULL;
2991         }
2992         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2993         if (ret != 0) {
2994                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2995                 goto again;
2996         }
2997         nodemap = rec->nodemap;
2998
2999         /* check which node is the recovery master */
3000         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3001         if (ret != 0) {
3002                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3003                 goto again;
3004         }
3005
3006         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3007         if (rec->recmaster != pnn) {
3008                 if (rec->ip_reallocate_ctx != NULL) {
3009                         talloc_free(rec->ip_reallocate_ctx);
3010                         rec->ip_reallocate_ctx = NULL;
3011                         rec->reallocate_callers = NULL;
3012                 }
3013         }
3014         /* if there are takeovers requested, perform it and notify the waiters */
3015         if (rec->reallocate_callers) {
3016                 process_ipreallocate_requests(ctdb, rec);
3017         }
3018
3019         if (rec->recmaster == (uint32_t)-1) {
3020                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3021                 force_election(rec, pnn, nodemap);
3022                 goto again;
3023         }
3024
3025
3026         /* if the local daemon is STOPPED, we verify that the databases are
3027            also frozen and thet the recmode is set to active 
3028         */
3029         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3030                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3031                 if (ret != 0) {
3032                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3033                 }
3034                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3035                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3036
3037                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3038                         if (ret != 0) {
3039                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3040                                 goto again;
3041                         }
3042                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3043                         if (ret != 0) {
3044                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3045
3046                                 goto again;
3047                         }
3048                         goto again;
3049                 }
3050         }
3051         /* If the local node is stopped, verify we are not the recmaster 
3052            and yield this role if so
3053         */
3054         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3055                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3056                 force_election(rec, pnn, nodemap);
3057                 goto again;
3058         }
3059         
3060         /* check that we (recovery daemon) and the local ctdb daemon
3061            agrees on whether we are banned or not
3062         */
3063 //qqq
3064
3065         /* remember our own node flags */
3066         rec->node_flags = nodemap->nodes[pnn].flags;
3067
3068         /* count how many active nodes there are */
3069         rec->num_active    = 0;
3070         rec->num_connected = 0;
3071         for (i=0; i<nodemap->num; i++) {
3072                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3073                         rec->num_active++;
3074                 }
3075                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3076                         rec->num_connected++;
3077                 }
3078         }
3079
3080
3081         /* verify that the recmaster node is still active */
3082         for (j=0; j<nodemap->num; j++) {
3083                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3084                         break;
3085                 }
3086         }
3087
3088         if (j == nodemap->num) {
3089                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3090                 force_election(rec, pnn, nodemap);
3091                 goto again;
3092         }
3093
3094         /* if recovery master is disconnected we must elect a new recmaster */
3095         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3096                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3097                 force_election(rec, pnn, nodemap);
3098                 goto again;
3099         }
3100
3101         /* grap the nodemap from the recovery master to check if it is banned */
3102         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3103                                    mem_ctx, &recmaster_nodemap);
3104         if (ret != 0) {
3105                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3106                           nodemap->nodes[j].pnn));
3107                 goto again;
3108         }
3109
3110
3111         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3112                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3113                 force_election(rec, pnn, nodemap);
3114                 goto again;
3115         }
3116
3117
3118         /* verify that we have all ip addresses we should have and we dont
3119          * have addresses we shouldnt have.
3120          */ 
3121         if (ctdb->do_checkpublicip) {
3122                 if (rec->ip_check_disable_ctx == NULL) {
3123                         if (verify_local_ip_allocation(ctdb, rec, pnn) != 0) {
3124                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3125                         }
3126                 }
3127         }
3128
3129
3130         /* if we are not the recmaster then we do not need to check
3131            if recovery is needed
3132          */
3133         if (pnn != rec->recmaster) {
3134                 goto again;
3135         }
3136
3137
3138         /* ensure our local copies of flags are right */
3139         ret = update_local_flags(rec, nodemap);
3140         if (ret == MONITOR_ELECTION_NEEDED) {
3141                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3142                 force_election(rec, pnn, nodemap);
3143                 goto again;
3144         }
3145         if (ret != MONITOR_OK) {
3146                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3147                 goto again;
3148         }
3149
3150         if (ctdb->num_nodes != nodemap->num) {
3151                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3152                 reload_nodes_file(ctdb);
3153                 goto again;
3154         }
3155
3156         /* verify that all active nodes agree that we are the recmaster */
3157         switch (verify_recmaster(rec, nodemap, pnn)) {
3158         case MONITOR_RECOVERY_NEEDED:
3159                 /* can not happen */
3160                 goto again;
3161         case MONITOR_ELECTION_NEEDED:
3162                 force_election(rec, pnn, nodemap);
3163                 goto again;
3164         case MONITOR_OK:
3165                 break;
3166         case MONITOR_FAILED:
3167                 goto again;
3168         }
3169
3170
3171         if (rec->need_recovery) {
3172                 /* a previous recovery didn't finish */
3173                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3174                 goto again;             
3175         }
3176
3177         /* verify that all active nodes are in normal mode 
3178            and not in recovery mode 
3179         */
3180         switch (verify_recmode(ctdb, nodemap)) {
3181         case MONITOR_RECOVERY_NEEDED:
3182                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3183                 goto again;
3184         case MONITOR_FAILED:
3185                 goto again;
3186         case MONITOR_ELECTION_NEEDED:
3187                 /* can not happen */
3188         case MONITOR_OK:
3189                 break;
3190         }
3191
3192
3193         if (ctdb->tunable.verify_recovery_lock != 0) {
3194                 /* we should have the reclock - check its not stale */
3195                 ret = check_recovery_lock(ctdb);
3196                 if (ret != 0) {
3197                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3198                         ctdb_set_culprit(rec, ctdb->pnn);
3199                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3200                         goto again;
3201                 }
3202         }
3203
3204         /* get the nodemap for all active remote nodes
3205          */
3206         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3207         if (remote_nodemaps == NULL) {
3208                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3209                 goto again;
3210         }
3211         for(i=0; i<nodemap->num; i++) {
3212                 remote_nodemaps[i] = NULL;
3213         }
3214         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3215                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3216                 goto again;
3217         } 
3218
3219         /* verify that all other nodes have the same nodemap as we have
3220         */
3221         for (j=0; j<nodemap->num; j++) {
3222                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3223                         continue;
3224                 }
3225
3226                 if (remote_nodemaps[j] == NULL) {
3227                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3228                         ctdb_set_culprit(rec, j);
3229
3230                         goto again;
3231                 }
3232
3233                 /* if the nodes disagree on how many nodes there are
3234                    then this is a good reason to try recovery
3235                  */
3236                 if (remote_nodemaps[j]->num != nodemap->num) {
3237                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3238                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3239                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3240                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3241                         goto again;
3242                 }
3243
3244                 /* if the nodes disagree on which nodes exist and are
3245                    active, then that is also a good reason to do recovery
3246                  */
3247                 for (i=0;i<nodemap->num;i++) {
3248                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3249                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3250                                           nodemap->nodes[j].pnn, i, 
3251                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3252                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3253                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3254                                             vnnmap);
3255                                 goto again;
3256                         }
3257                 }
3258
3259                 /* verify the flags are consistent
3260                 */
3261                 for (i=0; i<nodemap->num; i++) {
3262                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3263                                 continue;
3264                         }
3265                         
3266                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3267                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3268                                   nodemap->nodes[j].pnn, 
3269                                   nodemap->nodes[i].pnn, 
3270                                   remote_nodemaps[j]->nodes[i].flags,
3271                                   nodemap->nodes[j].flags));
3272                                 if (i == j) {
3273                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3274                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3275                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3276                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3277                                                     vnnmap);
3278                                         goto again;
3279                                 } else {
3280                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3281                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3282                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3283                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3284                                                     vnnmap);
3285                                         goto again;
3286                                 }
3287                         }
3288                 }
3289         }
3290
3291
3292         /* there better be the same number of lmasters in the vnn map
3293            as there are active nodes or we will have to do a recovery
3294          */
3295         if (vnnmap->size != rec->num_active) {
3296                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3297                           vnnmap->size, rec->num_active));
3298                 ctdb_set_culprit(rec, ctdb->pnn);
3299                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3300                 goto again;
3301         }
3302
3303         /* verify that all active nodes in the nodemap also exist in 
3304            the vnnmap.
3305          */
3306         for (j=0; j<nodemap->num; j++) {
3307                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3308                         continue;
3309                 }
3310                 if (nodemap->nodes[j].pnn == pnn) {
3311                         continue;
3312                 }
3313
3314                 for (i=0; i<vnnmap->size; i++) {
3315                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3316                                 break;
3317                         }
3318                 }
3319                 if (i == vnnmap->size) {
3320                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3321                                   nodemap->nodes[j].pnn));
3322                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3323                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3324                         goto again;
3325                 }
3326         }
3327
3328         
3329         /* verify that all other nodes have the same vnnmap
3330            and are from the same generation
3331          */
3332         for (j=0; j<nodemap->num; j++) {
3333                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3334                         continue;
3335                 }
3336                 if (nodemap->nodes[j].pnn == pnn) {
3337                         continue;
3338                 }
3339
3340                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3341                                           mem_ctx, &remote_vnnmap);
3342                 if (ret != 0) {
3343                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3344                                   nodemap->nodes[j].pnn));
3345                         goto again;
3346                 }
3347
3348                 /* verify the vnnmap generation is the same */
3349                 if (vnnmap->generation != remote_vnnmap->generation) {
3350                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3351                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3352                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3353                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3354                         goto again;
3355                 }
3356
3357                 /* verify the vnnmap size is the same */
3358                 if (vnnmap->size != remote_vnnmap->size) {
3359                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3360                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3361                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3362                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3363                         goto again;
3364                 }
3365
3366                 /* verify the vnnmap is the same */
3367                 for (i=0;i<vnnmap->size;i++) {
3368                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3369                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3370                                           nodemap->nodes[j].pnn));
3371                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3372                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3373                                             vnnmap);
3374                                 goto again;
3375                         }
3376                 }
3377         }
3378
3379         /* we might need to change who has what IP assigned */
3380         if (rec->need_takeover_run) {
3381                 uint32_t culprit = (uint32_t)-1;
3382
3383                 rec->need_takeover_run = false;
3384
3385                 /* update the list of public ips that a node can handle for
3386                    all connected nodes
3387                 */
3388                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3389                 if (ret != 0) {
3390                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3391                                          culprit));
3392                         ctdb_set_culprit(rec, culprit);
3393                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3394                         goto again;
3395                 }
3396
3397                 /* execute the "startrecovery" event script on all nodes */
3398                 ret = run_startrecovery_eventscript(rec, nodemap);
3399                 if (ret!=0) {
3400                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3401                         ctdb_set_culprit(rec, ctdb->pnn);
3402                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3403                         goto again;
3404                 }
3405
3406                 ret = ctdb_takeover_run(ctdb, nodemap);
3407                 if (ret != 0) {
3408                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3409                         ctdb_set_culprit(rec, ctdb->pnn);
3410                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3411                         goto again;
3412                 }
3413
3414                 /* execute the "recovered" event script on all nodes */
3415                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3416 #if 0
3417 // we cant check whether the event completed successfully
3418 // since this script WILL fail if the node is in recovery mode
3419 // and if that race happens, the code here would just cause a second
3420 // cascading recovery.
3421                 if (ret!=0) {
3422                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3423                         ctdb_set_culprit(rec, ctdb->pnn);
3424                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3425                 }
3426 #endif
3427         }
3428
3429
3430         goto again;
3431
3432 }
3433
3434 /*
3435   event handler for when the main ctdbd dies
3436  */
3437 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3438                                  uint16_t flags, void *private_data)
3439 {
3440         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3441         _exit(1);
3442 }
3443
3444 /*
3445   called regularly to verify that the recovery daemon is still running
3446  */
3447 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3448                               struct timeval yt, void *p)
3449 {
3450         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3451
3452         if (kill(ctdb->recoverd_pid, 0) != 0) {
3453                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3454
3455                 ctdb_stop_recoverd(ctdb);
3456                 ctdb_stop_keepalive(ctdb);
3457                 ctdb_stop_monitoring(ctdb);
3458                 ctdb_release_all_ips(ctdb);
3459                 if (ctdb->methods != NULL) {
3460                         ctdb->methods->shutdown(ctdb);
3461                 }
3462                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3463
3464                 exit(10);       
3465         }
3466
3467         event_add_timed(ctdb->ev, ctdb, 
3468                         timeval_current_ofs(30, 0),
3469                         ctdb_check_recd, ctdb);
3470 }
3471
3472 static void recd_sig_child_handler(struct event_context *ev,
3473         struct signal_event *se, int signum, int count,
3474         void *dont_care, 
3475         void *private_data)
3476 {
3477 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3478         int status;
3479         pid_t pid = -1;
3480
3481         while (pid != 0) {
3482                 pid = waitpid(-1, &status, WNOHANG);
3483                 if (pid == -1) {
3484                         if (errno != ECHILD) {
3485                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3486                         }
3487                         return;
3488                 }
3489                 if (pid > 0) {
3490                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3491                 }
3492         }
3493 }
3494
3495 /*
3496   startup the recovery daemon as a child of the main ctdb daemon
3497  */
3498 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3499 {
3500         int fd[2];
3501         struct signal_event *se;
3502
3503         if (pipe(fd) != 0) {
3504                 return -1;
3505         }
3506
3507         ctdb->ctdbd_pid = getpid();
3508
3509         ctdb->recoverd_pid = fork();
3510         if (ctdb->recoverd_pid == -1) {
3511                 return -1;
3512         }
3513         
3514         if (ctdb->recoverd_pid != 0) {
3515                 close(fd[0]);
3516                 event_add_timed(ctdb->ev, ctdb, 
3517                                 timeval_current_ofs(30, 0),
3518                                 ctdb_check_recd, ctdb);
3519                 return 0;
3520         }
3521
3522         close(fd[1]);
3523
3524         srandom(getpid() ^ time(NULL));
3525
3526         if (switch_from_server_to_client(ctdb) != 0) {
3527                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3528                 exit(1);
3529         }
3530
3531         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3532
3533         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3534                      ctdb_recoverd_parent, &fd[0]);     
3535
3536         /* set up a handler to pick up sigchld */
3537         se = event_add_signal(ctdb->ev, ctdb,
3538                                      SIGCHLD, 0,
3539                                      recd_sig_child_handler,
3540                                      ctdb);
3541         if (se == NULL) {
3542                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3543                 exit(1);
3544         }
3545
3546         monitor_cluster(ctdb);
3547
3548         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3549         return -1;
3550 }
3551
3552 /*
3553   shutdown the recovery daemon
3554  */
3555 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3556 {
3557         if (ctdb->recoverd_pid == 0) {
3558                 return;
3559         }
3560
3561         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3562         kill(ctdb->recoverd_pid, SIGTERM);
3563 }