recovery: add special pull-logic for persistent databases
[metze/ctdb/wip.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
258 {
259         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
260
261         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
262         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
263 }
264
265 /*
266   change recovery mode on all nodes
267  */
268 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
269 {
270         TDB_DATA data;
271         uint32_t *nodes;
272         TALLOC_CTX *tmp_ctx;
273
274         tmp_ctx = talloc_new(ctdb);
275         CTDB_NO_MEMORY(ctdb, tmp_ctx);
276
277         /* freeze all nodes */
278         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
279         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
280                 int i;
281
282                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
283                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
284                                                 nodes, i,
285                                                 CONTROL_TIMEOUT(),
286                                                 false, tdb_null,
287                                                 NULL,
288                                                 set_recmode_fail_callback,
289                                                 rec) != 0) {
290                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
291                                 talloc_free(tmp_ctx);
292                                 return -1;
293                         }
294                 }
295         }
296
297
298         data.dsize = sizeof(uint32_t);
299         data.dptr = (unsigned char *)&rec_mode;
300
301         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
302                                         nodes, 0,
303                                         CONTROL_TIMEOUT(),
304                                         false, data,
305                                         NULL, NULL,
306                                         NULL) != 0) {
307                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
308                 talloc_free(tmp_ctx);
309                 return -1;
310         }
311
312         talloc_free(tmp_ctx);
313         return 0;
314 }
315
316 /*
317   change recovery master on all node
318  */
319 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
320 {
321         TDB_DATA data;
322         TALLOC_CTX *tmp_ctx;
323         uint32_t *nodes;
324
325         tmp_ctx = talloc_new(ctdb);
326         CTDB_NO_MEMORY(ctdb, tmp_ctx);
327
328         data.dsize = sizeof(uint32_t);
329         data.dptr = (unsigned char *)&pnn;
330
331         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
332         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
333                                         nodes, 0,
334                                         CONTROL_TIMEOUT(), false, data,
335                                         NULL, NULL,
336                                         NULL) != 0) {
337                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
338                 talloc_free(tmp_ctx);
339                 return -1;
340         }
341
342         talloc_free(tmp_ctx);
343         return 0;
344 }
345
346 /* update all remote nodes to use the same db priority that we have
347    this can fail if the remove node has not yet been upgraded to 
348    support this function, so we always return success and never fail
349    a recovery if this call fails.
350 */
351 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
352         struct ctdb_node_map *nodemap, 
353         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
354 {
355         int db;
356         uint32_t *nodes;
357
358         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
359
360         /* step through all local databases */
361         for (db=0; db<dbmap->num;db++) {
362                 TDB_DATA data;
363                 struct ctdb_db_priority db_prio;
364                 int ret;
365
366                 db_prio.db_id     = dbmap->dbs[db].dbid;
367                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
368                 if (ret != 0) {
369                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
370                         continue;
371                 }
372
373                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
374
375                 data.dptr  = (uint8_t *)&db_prio;
376                 data.dsize = sizeof(db_prio);
377
378                 if (ctdb_client_async_control(ctdb,
379                                         CTDB_CONTROL_SET_DB_PRIORITY,
380                                         nodes, 0,
381                                         CONTROL_TIMEOUT(), false, data,
382                                         NULL, NULL,
383                                         NULL) != 0) {
384                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
385                 }
386         }
387
388         return 0;
389 }                       
390
391 /*
392   ensure all other nodes have attached to any databases that we have
393  */
394 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
395                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
396 {
397         int i, j, db, ret;
398         struct ctdb_dbid_map *remote_dbmap;
399
400         /* verify that all other nodes have all our databases */
401         for (j=0; j<nodemap->num; j++) {
402                 /* we dont need to ourself ourselves */
403                 if (nodemap->nodes[j].pnn == pnn) {
404                         continue;
405                 }
406                 /* dont check nodes that are unavailable */
407                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
408                         continue;
409                 }
410
411                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
412                                          mem_ctx, &remote_dbmap);
413                 if (ret != 0) {
414                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
415                         return -1;
416                 }
417
418                 /* step through all local databases */
419                 for (db=0; db<dbmap->num;db++) {
420                         const char *name;
421
422
423                         for (i=0;i<remote_dbmap->num;i++) {
424                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
425                                         break;
426                                 }
427                         }
428                         /* the remote node already have this database */
429                         if (i!=remote_dbmap->num) {
430                                 continue;
431                         }
432                         /* ok so we need to create this database */
433                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
434                                             mem_ctx, &name);
435                         if (ret != 0) {
436                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
437                                 return -1;
438                         }
439                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
440                                            mem_ctx, name, dbmap->dbs[db].persistent);
441                         if (ret != 0) {
442                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
443                                 return -1;
444                         }
445                 }
446         }
447
448         return 0;
449 }
450
451
452 /*
453   ensure we are attached to any databases that anyone else is attached to
454  */
455 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
456                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
457 {
458         int i, j, db, ret;
459         struct ctdb_dbid_map *remote_dbmap;
460
461         /* verify that we have all database any other node has */
462         for (j=0; j<nodemap->num; j++) {
463                 /* we dont need to ourself ourselves */
464                 if (nodemap->nodes[j].pnn == pnn) {
465                         continue;
466                 }
467                 /* dont check nodes that are unavailable */
468                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
469                         continue;
470                 }
471
472                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
473                                          mem_ctx, &remote_dbmap);
474                 if (ret != 0) {
475                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
476                         return -1;
477                 }
478
479                 /* step through all databases on the remote node */
480                 for (db=0; db<remote_dbmap->num;db++) {
481                         const char *name;
482
483                         for (i=0;i<(*dbmap)->num;i++) {
484                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
485                                         break;
486                                 }
487                         }
488                         /* we already have this db locally */
489                         if (i!=(*dbmap)->num) {
490                                 continue;
491                         }
492                         /* ok so we need to create this database and
493                            rebuild dbmap
494                          */
495                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
496                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
497                         if (ret != 0) {
498                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
499                                           nodemap->nodes[j].pnn));
500                                 return -1;
501                         }
502                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
503                                            remote_dbmap->dbs[db].persistent);
504                         if (ret != 0) {
505                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
506                                 return -1;
507                         }
508                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
511                                 return -1;
512                         }
513                 }
514         }
515
516         return 0;
517 }
518
519
520 /*
521   pull the remote database contents from one node into the recdb
522  */
523 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
524                                     struct tdb_wrap *recdb, uint32_t dbid,
525                                     bool persistent)
526 {
527         int ret;
528         TDB_DATA outdata;
529         struct ctdb_marshall_buffer *reply;
530         struct ctdb_rec_data *rec;
531         int i;
532         int32_t transaction_active = 0;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552
553         if (persistent) {
554                 transaction_active = ctdb_ctrl_transaction_active(ctdb, srcnode,
555                                                                   dbid);
556                 if (transaction_active == -1) {
557                         DEBUG(DEBUG_ERR, (__location__ " error calling "
558                                           "ctdb_ctrl_transaction_active to node"
559                                           " %u\n", srcnode));
560                         talloc_free(tmp_ctx);
561                         return -1;
562                 }
563         }
564         
565         for (i=0;
566              i<reply->count;
567              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
568                 TDB_DATA key, data;
569                 struct ctdb_ltdb_header *hdr;
570                 TDB_DATA existing;
571                 
572                 key.dptr = &rec->data[0];
573                 key.dsize = rec->keylen;
574                 data.dptr = &rec->data[key.dsize];
575                 data.dsize = rec->datalen;
576                 
577                 hdr = (struct ctdb_ltdb_header *)data.dptr;
578
579                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
580                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
581                         talloc_free(tmp_ctx);
582                         return -1;
583                 }
584
585                 /* fetch the existing record, if any */
586                 existing = tdb_fetch(recdb->tdb, key);
587                 
588                 if (existing.dptr != NULL) {
589                         struct ctdb_ltdb_header header;
590                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
591                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
592                                          (unsigned)existing.dsize, srcnode));
593                                 free(existing.dptr);
594                                 talloc_free(tmp_ctx);
595                                 return -1;
596                         }
597                         header = *(struct ctdb_ltdb_header *)existing.dptr;
598                         free(existing.dptr);
599                         if (!persistent) {
600                                 if (!(header.rsn < hdr->rsn ||
601                                     (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn)))
602                                 {
603                                         continue;
604                                 }
605                         } else {
606                                 if (header.lacount == (uint32_t)-1) {
607                                         /*
608                                          * skip record if the stored copy came
609                                          * from a node with active transaction
610                                          */
611                                         continue;
612                                 }
613
614                                 if ((header.rsn >= hdr->rsn) &&
615                                     !transaction_active)
616                                 {
617                                         continue;
618                                 }
619                         }
620                 }
621
622                 if (persistent) {
623                         /*
624                          * Misuse the lacount field to signal
625                          * that we got the record from a node
626                          * that has a transaction running.
627                          */
628                         if (transaction_active) {
629                                 hdr->lacount = (uint32_t)-1;
630                         } else {
631                                 hdr->lacount = 0;
632                         }
633                 }
634
635                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
636                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
637                         talloc_free(tmp_ctx);
638                         return -1;                              
639                 }
640         }
641
642         talloc_free(tmp_ctx);
643
644         return 0;
645 }
646
647 /*
648   pull all the remote database contents into the recdb
649  */
650 static int pull_remote_database(struct ctdb_context *ctdb,
651                                 struct ctdb_recoverd *rec, 
652                                 struct ctdb_node_map *nodemap, 
653                                 struct tdb_wrap *recdb, uint32_t dbid,
654                                 bool persistent)
655 {
656         int j;
657
658         /* pull all records from all other nodes across onto this node
659            (this merges based on rsn)
660         */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont merge from nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
667                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
668                                  nodemap->nodes[j].pnn));
669                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
670                         return -1;
671                 }
672         }
673         
674         return 0;
675 }
676
677
678 /*
679   update flags on all active nodes
680  */
681 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
682 {
683         int ret;
684
685         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
686                 if (ret != 0) {
687                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
688                 return -1;
689         }
690
691         return 0;
692 }
693
694 /*
695   ensure all nodes have the same vnnmap we do
696  */
697 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
698                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
699 {
700         int j, ret;
701
702         /* push the new vnn map out to all the nodes */
703         for (j=0; j<nodemap->num; j++) {
704                 /* dont push to nodes that are unavailable */
705                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
706                         continue;
707                 }
708
709                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
710                 if (ret != 0) {
711                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
712                         return -1;
713                 }
714         }
715
716         return 0;
717 }
718
719
720 struct vacuum_info {
721         struct vacuum_info *next, *prev;
722         struct ctdb_recoverd *rec;
723         uint32_t srcnode;
724         struct ctdb_db_context *ctdb_db;
725         struct ctdb_marshall_buffer *recs;
726         struct ctdb_rec_data *r;
727 };
728
729 static void vacuum_fetch_next(struct vacuum_info *v);
730
731 /*
732   called when a vacuum fetch has completed - just free it and do the next one
733  */
734 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
735 {
736         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
737         talloc_free(state);
738         vacuum_fetch_next(v);
739 }
740
741
742 /*
743   process the next element from the vacuum list
744 */
745 static void vacuum_fetch_next(struct vacuum_info *v)
746 {
747         struct ctdb_call call;
748         struct ctdb_rec_data *r;
749
750         while (v->recs->count) {
751                 struct ctdb_client_call_state *state;
752                 TDB_DATA data;
753                 struct ctdb_ltdb_header *hdr;
754
755                 ZERO_STRUCT(call);
756                 call.call_id = CTDB_NULL_FUNC;
757                 call.flags = CTDB_IMMEDIATE_MIGRATION;
758
759                 r = v->r;
760                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
761                 v->recs->count--;
762
763                 call.key.dptr = &r->data[0];
764                 call.key.dsize = r->keylen;
765
766                 /* ensure we don't block this daemon - just skip a record if we can't get
767                    the chainlock */
768                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
769                         continue;
770                 }
771
772                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
773                 if (data.dptr == NULL) {
774                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
775                         continue;
776                 }
777
778                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
779                         free(data.dptr);
780                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
781                         continue;
782                 }
783                 
784                 hdr = (struct ctdb_ltdb_header *)data.dptr;
785                 if (hdr->dmaster == v->rec->ctdb->pnn) {
786                         /* its already local */
787                         free(data.dptr);
788                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
789                         continue;
790                 }
791
792                 free(data.dptr);
793
794                 state = ctdb_call_send(v->ctdb_db, &call);
795                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
796                 if (state == NULL) {
797                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
798                         talloc_free(v);
799                         return;
800                 }
801                 state->async.fn = vacuum_fetch_callback;
802                 state->async.private_data = v;
803                 return;
804         }
805
806         talloc_free(v);
807 }
808
809
810 /*
811   destroy a vacuum info structure
812  */
813 static int vacuum_info_destructor(struct vacuum_info *v)
814 {
815         DLIST_REMOVE(v->rec->vacuum_info, v);
816         return 0;
817 }
818
819
820 /*
821   handler for vacuum fetch
822 */
823 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
824                                  TDB_DATA data, void *private_data)
825 {
826         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
827         struct ctdb_marshall_buffer *recs;
828         int ret, i;
829         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
830         const char *name;
831         struct ctdb_dbid_map *dbmap=NULL;
832         bool persistent = false;
833         struct ctdb_db_context *ctdb_db;
834         struct ctdb_rec_data *r;
835         uint32_t srcnode;
836         struct vacuum_info *v;
837
838         recs = (struct ctdb_marshall_buffer *)data.dptr;
839         r = (struct ctdb_rec_data *)&recs->data[0];
840
841         if (recs->count == 0) {
842                 talloc_free(tmp_ctx);
843                 return;
844         }
845
846         srcnode = r->reqid;
847
848         for (v=rec->vacuum_info;v;v=v->next) {
849                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
850                         /* we're already working on records from this node */
851                         talloc_free(tmp_ctx);
852                         return;
853                 }
854         }
855
856         /* work out if the database is persistent */
857         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
858         if (ret != 0) {
859                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
860                 talloc_free(tmp_ctx);
861                 return;
862         }
863
864         for (i=0;i<dbmap->num;i++) {
865                 if (dbmap->dbs[i].dbid == recs->db_id) {
866                         persistent = dbmap->dbs[i].persistent;
867                         break;
868                 }
869         }
870         if (i == dbmap->num) {
871                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
872                 talloc_free(tmp_ctx);
873                 return;         
874         }
875
876         /* find the name of this database */
877         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
878                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
879                 talloc_free(tmp_ctx);
880                 return;
881         }
882
883         /* attach to it */
884         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
885         if (ctdb_db == NULL) {
886                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
887                 talloc_free(tmp_ctx);
888                 return;
889         }
890
891         v = talloc_zero(rec, struct vacuum_info);
892         if (v == NULL) {
893                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
894                 talloc_free(tmp_ctx);
895                 return;
896         }
897
898         v->rec = rec;
899         v->srcnode = srcnode;
900         v->ctdb_db = ctdb_db;
901         v->recs = talloc_memdup(v, recs, data.dsize);
902         if (v->recs == NULL) {
903                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
904                 talloc_free(v);
905                 talloc_free(tmp_ctx);
906                 return;         
907         }
908         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
909
910         DLIST_ADD(rec->vacuum_info, v);
911
912         talloc_set_destructor(v, vacuum_info_destructor);
913
914         vacuum_fetch_next(v);
915         talloc_free(tmp_ctx);
916 }
917
918
919 /*
920   called when ctdb_wait_timeout should finish
921  */
922 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
923                               struct timeval yt, void *p)
924 {
925         uint32_t *timed_out = (uint32_t *)p;
926         (*timed_out) = 1;
927 }
928
929 /*
930   wait for a given number of seconds
931  */
932 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
933 {
934         uint32_t timed_out = 0;
935         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
936         while (!timed_out) {
937                 event_loop_once(ctdb->ev);
938         }
939 }
940
941 /*
942   called when an election times out (ends)
943  */
944 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
945                                   struct timeval t, void *p)
946 {
947         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
948         rec->election_timeout = NULL;
949
950         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
951 }
952
953
954 /*
955   wait for an election to finish. It finished election_timeout seconds after
956   the last election packet is received
957  */
958 static void ctdb_wait_election(struct ctdb_recoverd *rec)
959 {
960         struct ctdb_context *ctdb = rec->ctdb;
961         while (rec->election_timeout) {
962                 event_loop_once(ctdb->ev);
963         }
964 }
965
966 /*
967   Update our local flags from all remote connected nodes. 
968   This is only run when we are or we belive we are the recovery master
969  */
970 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
971 {
972         int j;
973         struct ctdb_context *ctdb = rec->ctdb;
974         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
975
976         /* get the nodemap for all active remote nodes and verify
977            they are the same as for this node
978          */
979         for (j=0; j<nodemap->num; j++) {
980                 struct ctdb_node_map *remote_nodemap=NULL;
981                 int ret;
982
983                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
984                         continue;
985                 }
986                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
987                         continue;
988                 }
989
990                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
991                                            mem_ctx, &remote_nodemap);
992                 if (ret != 0) {
993                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
994                                   nodemap->nodes[j].pnn));
995                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
996                         talloc_free(mem_ctx);
997                         return MONITOR_FAILED;
998                 }
999                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1000                         /* We should tell our daemon about this so it
1001                            updates its flags or else we will log the same 
1002                            message again in the next iteration of recovery.
1003                            Since we are the recovery master we can just as
1004                            well update the flags on all nodes.
1005                         */
1006                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1007                         if (ret != 0) {
1008                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1009                                 return -1;
1010                         }
1011
1012                         /* Update our local copy of the flags in the recovery
1013                            daemon.
1014                         */
1015                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1016                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1017                                  nodemap->nodes[j].flags));
1018                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1019                 }
1020                 talloc_free(remote_nodemap);
1021         }
1022         talloc_free(mem_ctx);
1023         return MONITOR_OK;
1024 }
1025
1026
1027 /* Create a new random generation ip. 
1028    The generation id can not be the INVALID_GENERATION id
1029 */
1030 static uint32_t new_generation(void)
1031 {
1032         uint32_t generation;
1033
1034         while (1) {
1035                 generation = random();
1036
1037                 if (generation != INVALID_GENERATION) {
1038                         break;
1039                 }
1040         }
1041
1042         return generation;
1043 }
1044
1045
1046 /*
1047   create a temporary working database
1048  */
1049 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1050 {
1051         char *name;
1052         struct tdb_wrap *recdb;
1053         unsigned tdb_flags;
1054
1055         /* open up the temporary recovery database */
1056         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1057         if (name == NULL) {
1058                 return NULL;
1059         }
1060         unlink(name);
1061
1062         tdb_flags = TDB_NOLOCK;
1063         if (!ctdb->do_setsched) {
1064                 tdb_flags |= TDB_NOMMAP;
1065         }
1066
1067         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1068                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1069         if (recdb == NULL) {
1070                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1071         }
1072
1073         talloc_free(name);
1074
1075         return recdb;
1076 }
1077
1078
1079 /* 
1080    a traverse function for pulling all relevent records from recdb
1081  */
1082 struct recdb_data {
1083         struct ctdb_context *ctdb;
1084         struct ctdb_marshall_buffer *recdata;
1085         uint32_t len;
1086         bool failed;
1087         bool persistent;
1088 };
1089
1090 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1091 {
1092         struct recdb_data *params = (struct recdb_data *)p;
1093         struct ctdb_rec_data *rec;
1094         struct ctdb_ltdb_header *hdr;
1095
1096         /* skip empty records */
1097         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1098                 return 0;
1099         }
1100
1101         /* update the dmaster field to point to us */
1102         hdr = (struct ctdb_ltdb_header *)data.dptr;
1103         if (!params->persistent) {
1104                 hdr->dmaster = params->ctdb->pnn;
1105         } else {
1106                 /*
1107                  * Clear the lacount field that had been misused
1108                  * when pulling the db in order to keep track of
1109                  * whether the node had a transaction running.
1110                  */
1111                 hdr->lacount = 0;
1112         }
1113
1114         /* add the record to the blob ready to send to the nodes */
1115         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1116         if (rec == NULL) {
1117                 params->failed = true;
1118                 return -1;
1119         }
1120         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1121         if (params->recdata == NULL) {
1122                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1123                          rec->length + params->len, params->recdata->count));
1124                 params->failed = true;
1125                 return -1;
1126         }
1127         params->recdata->count++;
1128         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1129         params->len += rec->length;
1130         talloc_free(rec);
1131
1132         return 0;
1133 }
1134
1135 /*
1136   push the recdb database out to all nodes
1137  */
1138 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1139                                bool persistent,
1140                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1141 {
1142         struct recdb_data params;
1143         struct ctdb_marshall_buffer *recdata;
1144         TDB_DATA outdata;
1145         TALLOC_CTX *tmp_ctx;
1146         uint32_t *nodes;
1147
1148         tmp_ctx = talloc_new(ctdb);
1149         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1150
1151         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1152         CTDB_NO_MEMORY(ctdb, recdata);
1153
1154         recdata->db_id = dbid;
1155
1156         params.ctdb = ctdb;
1157         params.recdata = recdata;
1158         params.len = offsetof(struct ctdb_marshall_buffer, data);
1159         params.failed = false;
1160         params.persistent = persistent;
1161
1162         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1163                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1164                 talloc_free(params.recdata);
1165                 talloc_free(tmp_ctx);
1166                 return -1;
1167         }
1168
1169         if (params.failed) {
1170                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1171                 talloc_free(params.recdata);
1172                 talloc_free(tmp_ctx);
1173                 return -1;              
1174         }
1175
1176         recdata = params.recdata;
1177
1178         outdata.dptr = (void *)recdata;
1179         outdata.dsize = params.len;
1180
1181         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1182         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1183                                         nodes, 0,
1184                                         CONTROL_TIMEOUT(), false, outdata,
1185                                         NULL, NULL,
1186                                         NULL) != 0) {
1187                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1188                 talloc_free(recdata);
1189                 talloc_free(tmp_ctx);
1190                 return -1;
1191         }
1192
1193         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1194                   dbid, recdata->count));
1195
1196         talloc_free(recdata);
1197         talloc_free(tmp_ctx);
1198
1199         return 0;
1200 }
1201
1202
1203 /*
1204   go through a full recovery on one database 
1205  */
1206 static int recover_database(struct ctdb_recoverd *rec, 
1207                             TALLOC_CTX *mem_ctx,
1208                             uint32_t dbid,
1209                             bool persistent,
1210                             uint32_t pnn, 
1211                             struct ctdb_node_map *nodemap,
1212                             uint32_t transaction_id)
1213 {
1214         struct tdb_wrap *recdb;
1215         int ret;
1216         struct ctdb_context *ctdb = rec->ctdb;
1217         TDB_DATA data;
1218         struct ctdb_control_wipe_database w;
1219         uint32_t *nodes;
1220
1221         recdb = create_recdb(ctdb, mem_ctx);
1222         if (recdb == NULL) {
1223                 return -1;
1224         }
1225
1226         /* pull all remote databases onto the recdb */
1227         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1228         if (ret != 0) {
1229                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1230                 return -1;
1231         }
1232
1233         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1234
1235         /* wipe all the remote databases. This is safe as we are in a transaction */
1236         w.db_id = dbid;
1237         w.transaction_id = transaction_id;
1238
1239         data.dptr = (void *)&w;
1240         data.dsize = sizeof(w);
1241
1242         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1243         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1244                                         nodes, 0,
1245                                         CONTROL_TIMEOUT(), false, data,
1246                                         NULL, NULL,
1247                                         NULL) != 0) {
1248                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1249                 talloc_free(recdb);
1250                 return -1;
1251         }
1252         
1253         /* push out the correct database. This sets the dmaster and skips 
1254            the empty records */
1255         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1256         if (ret != 0) {
1257                 talloc_free(recdb);
1258                 return -1;
1259         }
1260
1261         /* all done with this database */
1262         talloc_free(recdb);
1263
1264         return 0;
1265 }
1266
1267 /*
1268   reload the nodes file 
1269 */
1270 static void reload_nodes_file(struct ctdb_context *ctdb)
1271 {
1272         ctdb->nodes = NULL;
1273         ctdb_load_nodes_file(ctdb);
1274 }
1275
1276         
1277 /*
1278   we are the recmaster, and recovery is needed - start a recovery run
1279  */
1280 static int do_recovery(struct ctdb_recoverd *rec, 
1281                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1282                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1283 {
1284         struct ctdb_context *ctdb = rec->ctdb;
1285         int i, j, ret;
1286         uint32_t generation;
1287         struct ctdb_dbid_map *dbmap;
1288         TDB_DATA data;
1289         uint32_t *nodes;
1290         struct timeval start_time;
1291
1292         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1293
1294         /* if recovery fails, force it again */
1295         rec->need_recovery = true;
1296
1297         for (i=0; i<ctdb->num_nodes; i++) {
1298                 struct ctdb_banning_state *ban_state;
1299
1300                 if (ctdb->nodes[i]->ban_state == NULL) {
1301                         continue;
1302                 }
1303                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1304                 if (ban_state->count < 2*ctdb->num_nodes) {
1305                         continue;
1306                 }
1307                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1308                         ctdb->nodes[i]->pnn, ban_state->count,
1309                         ctdb->tunable.recovery_ban_period));
1310                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1311                 ban_state->count = 0;
1312         }
1313
1314
1315         if (ctdb->tunable.verify_recovery_lock != 0) {
1316                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1317                 start_time = timeval_current();
1318                 if (!ctdb_recovery_lock(ctdb, true)) {
1319                         ctdb_set_culprit(rec, pnn);
1320                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1321                         return -1;
1322                 }
1323                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1324                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1325         }
1326
1327         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1328
1329         /* get a list of all databases */
1330         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1331         if (ret != 0) {
1332                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1333                 return -1;
1334         }
1335
1336         /* we do the db creation before we set the recovery mode, so the freeze happens
1337            on all databases we will be dealing with. */
1338
1339         /* verify that we have all the databases any other node has */
1340         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1341         if (ret != 0) {
1342                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1343                 return -1;
1344         }
1345
1346         /* verify that all other nodes have all our databases */
1347         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1348         if (ret != 0) {
1349                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1350                 return -1;
1351         }
1352         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1353
1354         /* update the database priority for all remote databases */
1355         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1356         if (ret != 0) {
1357                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1358         }
1359         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1360
1361
1362         /* set recovery mode to active on all nodes */
1363         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1364         if (ret != 0) {
1365                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1366                 return -1;
1367         }
1368
1369         /* execute the "startrecovery" event script on all nodes */
1370         ret = run_startrecovery_eventscript(rec, nodemap);
1371         if (ret!=0) {
1372                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1373                 return -1;
1374         }
1375
1376         /* pick a new generation number */
1377         generation = new_generation();
1378
1379         /* change the vnnmap on this node to use the new generation 
1380            number but not on any other nodes.
1381            this guarantees that if we abort the recovery prematurely
1382            for some reason (a node stops responding?)
1383            that we can just return immediately and we will reenter
1384            recovery shortly again.
1385            I.e. we deliberately leave the cluster with an inconsistent
1386            generation id to allow us to abort recovery at any stage and
1387            just restart it from scratch.
1388          */
1389         vnnmap->generation = generation;
1390         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1391         if (ret != 0) {
1392                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1393                 return -1;
1394         }
1395
1396         data.dptr = (void *)&generation;
1397         data.dsize = sizeof(uint32_t);
1398
1399         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1400         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1401                                         nodes, 0,
1402                                         CONTROL_TIMEOUT(), false, data,
1403                                         NULL,
1404                                         transaction_start_fail_callback,
1405                                         rec) != 0) {
1406                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1407                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1408                                         nodes, 0,
1409                                         CONTROL_TIMEOUT(), false, tdb_null,
1410                                         NULL,
1411                                         NULL,
1412                                         NULL) != 0) {
1413                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1414                 }
1415                 return -1;
1416         }
1417
1418         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1419
1420         for (i=0;i<dbmap->num;i++) {
1421                 ret = recover_database(rec, mem_ctx,
1422                                        dbmap->dbs[i].dbid,
1423                                        dbmap->dbs[i].persistent,
1424                                        pnn, nodemap, generation);
1425                 if (ret != 0) {
1426                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1427                         return -1;
1428                 }
1429         }
1430
1431         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1432
1433         /* commit all the changes */
1434         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1435                                         nodes, 0,
1436                                         CONTROL_TIMEOUT(), false, data,
1437                                         NULL, NULL,
1438                                         NULL) != 0) {
1439                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1440                 return -1;
1441         }
1442
1443         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1444         
1445
1446         /* update the capabilities for all nodes */
1447         ret = update_capabilities(ctdb, nodemap);
1448         if (ret!=0) {
1449                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1450                 return -1;
1451         }
1452
1453         /* build a new vnn map with all the currently active and
1454            unbanned nodes */
1455         generation = new_generation();
1456         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1457         CTDB_NO_MEMORY(ctdb, vnnmap);
1458         vnnmap->generation = generation;
1459         vnnmap->size = 0;
1460         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1461         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1462         for (i=j=0;i<nodemap->num;i++) {
1463                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1464                         continue;
1465                 }
1466                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1467                         /* this node can not be an lmaster */
1468                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1469                         continue;
1470                 }
1471
1472                 vnnmap->size++;
1473                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1474                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1475                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1476
1477         }
1478         if (vnnmap->size == 0) {
1479                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1480                 vnnmap->size++;
1481                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1482                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1483                 vnnmap->map[0] = pnn;
1484         }       
1485
1486         /* update to the new vnnmap on all nodes */
1487         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1488         if (ret != 0) {
1489                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1490                 return -1;
1491         }
1492
1493         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1494
1495         /* update recmaster to point to us for all nodes */
1496         ret = set_recovery_master(ctdb, nodemap, pnn);
1497         if (ret!=0) {
1498                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1499                 return -1;
1500         }
1501
1502         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1503
1504         /*
1505           update all nodes to have the same flags that we have
1506          */
1507         for (i=0;i<nodemap->num;i++) {
1508                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1509                         continue;
1510                 }
1511
1512                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1513                 if (ret != 0) {
1514                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1515                         return -1;
1516                 }
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1520
1521         /* disable recovery mode */
1522         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1523         if (ret != 0) {
1524                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1525                 return -1;
1526         }
1527
1528         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1529
1530         /*
1531           tell nodes to takeover their public IPs
1532          */
1533         rec->need_takeover_run = false;
1534         ret = ctdb_takeover_run(ctdb, nodemap);
1535         if (ret != 0) {
1536                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1537                 return -1;
1538         }
1539         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1540
1541         /* execute the "recovered" event script on all nodes */
1542         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1543         if (ret!=0) {
1544                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1545                 return -1;
1546         }
1547
1548         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1549
1550         /* send a message to all clients telling them that the cluster 
1551            has been reconfigured */
1552         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1553
1554         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1555
1556         rec->need_recovery = false;
1557
1558         /* we managed to complete a full recovery, make sure to forgive
1559            any past sins by the nodes that could now participate in the
1560            recovery.
1561         */
1562         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1563         for (i=0;i<nodemap->num;i++) {
1564                 struct ctdb_banning_state *ban_state;
1565
1566                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1567                         continue;
1568                 }
1569
1570                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1571                 if (ban_state == NULL) {
1572                         continue;
1573                 }
1574
1575                 ban_state->count = 0;
1576         }
1577
1578
1579         /* We just finished a recovery successfully. 
1580            We now wait for rerecovery_timeout before we allow 
1581            another recovery to take place.
1582         */
1583         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1584         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1585         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1586
1587         return 0;
1588 }
1589
1590
1591 /*
1592   elections are won by first checking the number of connected nodes, then
1593   the priority time, then the pnn
1594  */
1595 struct election_message {
1596         uint32_t num_connected;
1597         struct timeval priority_time;
1598         uint32_t pnn;
1599         uint32_t node_flags;
1600 };
1601
1602 /*
1603   form this nodes election data
1604  */
1605 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1606 {
1607         int ret, i;
1608         struct ctdb_node_map *nodemap;
1609         struct ctdb_context *ctdb = rec->ctdb;
1610
1611         ZERO_STRUCTP(em);
1612
1613         em->pnn = rec->ctdb->pnn;
1614         em->priority_time = rec->priority_time;
1615
1616         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1617         if (ret != 0) {
1618                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1619                 return;
1620         }
1621
1622         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1623         em->node_flags = rec->node_flags;
1624
1625         for (i=0;i<nodemap->num;i++) {
1626                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1627                         em->num_connected++;
1628                 }
1629         }
1630
1631         /* we shouldnt try to win this election if we cant be a recmaster */
1632         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1633                 em->num_connected = 0;
1634                 em->priority_time = timeval_current();
1635         }
1636
1637         talloc_free(nodemap);
1638 }
1639
1640 /*
1641   see if the given election data wins
1642  */
1643 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1644 {
1645         struct election_message myem;
1646         int cmp = 0;
1647
1648         ctdb_election_data(rec, &myem);
1649
1650         /* we cant win if we dont have the recmaster capability */
1651         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1652                 return false;
1653         }
1654
1655         /* we cant win if we are banned */
1656         if (rec->node_flags & NODE_FLAGS_BANNED) {
1657                 return false;
1658         }       
1659
1660         /* we cant win if we are stopped */
1661         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1662                 return false;
1663         }       
1664
1665         /* we will automatically win if the other node is banned */
1666         if (em->node_flags & NODE_FLAGS_BANNED) {
1667                 return true;
1668         }
1669
1670         /* we will automatically win if the other node is banned */
1671         if (em->node_flags & NODE_FLAGS_STOPPED) {
1672                 return true;
1673         }
1674
1675         /* try to use the most connected node */
1676         if (cmp == 0) {
1677                 cmp = (int)myem.num_connected - (int)em->num_connected;
1678         }
1679
1680         /* then the longest running node */
1681         if (cmp == 0) {
1682                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1683         }
1684
1685         if (cmp == 0) {
1686                 cmp = (int)myem.pnn - (int)em->pnn;
1687         }
1688
1689         return cmp > 0;
1690 }
1691
1692 /*
1693   send out an election request
1694  */
1695 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1696 {
1697         int ret;
1698         TDB_DATA election_data;
1699         struct election_message emsg;
1700         uint64_t srvid;
1701         struct ctdb_context *ctdb = rec->ctdb;
1702
1703         srvid = CTDB_SRVID_RECOVERY;
1704
1705         ctdb_election_data(rec, &emsg);
1706
1707         election_data.dsize = sizeof(struct election_message);
1708         election_data.dptr  = (unsigned char *)&emsg;
1709
1710
1711         /* send an election message to all active nodes */
1712         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1713         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1714
1715
1716         /* A new node that is already frozen has entered the cluster.
1717            The existing nodes are not frozen and dont need to be frozen
1718            until the election has ended and we start the actual recovery
1719         */
1720         if (update_recmaster == true) {
1721                 /* first we assume we will win the election and set 
1722                    recoverymaster to be ourself on the current node
1723                  */
1724                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1725                 if (ret != 0) {
1726                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1727                         return -1;
1728                 }
1729         }
1730
1731
1732         return 0;
1733 }
1734
1735 /*
1736   this function will unban all nodes in the cluster
1737 */
1738 static void unban_all_nodes(struct ctdb_context *ctdb)
1739 {
1740         int ret, i;
1741         struct ctdb_node_map *nodemap;
1742         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1743         
1744         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1745         if (ret != 0) {
1746                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1747                 return;
1748         }
1749
1750         for (i=0;i<nodemap->num;i++) {
1751                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1752                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1753                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1754                 }
1755         }
1756
1757         talloc_free(tmp_ctx);
1758 }
1759
1760
1761 /*
1762   we think we are winning the election - send a broadcast election request
1763  */
1764 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1765 {
1766         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1767         int ret;
1768
1769         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1770         if (ret != 0) {
1771                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1772         }
1773
1774         talloc_free(rec->send_election_te);
1775         rec->send_election_te = NULL;
1776 }
1777
1778 /*
1779   handler for memory dumps
1780 */
1781 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1782                              TDB_DATA data, void *private_data)
1783 {
1784         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1785         TDB_DATA *dump;
1786         int ret;
1787         struct rd_memdump_reply *rd;
1788
1789         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1790                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1791                 talloc_free(tmp_ctx);
1792                 return;
1793         }
1794         rd = (struct rd_memdump_reply *)data.dptr;
1795
1796         dump = talloc_zero(tmp_ctx, TDB_DATA);
1797         if (dump == NULL) {
1798                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1799                 talloc_free(tmp_ctx);
1800                 return;
1801         }
1802         ret = ctdb_dump_memory(ctdb, dump);
1803         if (ret != 0) {
1804                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1805                 talloc_free(tmp_ctx);
1806                 return;
1807         }
1808
1809 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1810
1811         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1812         if (ret != 0) {
1813                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1814                 talloc_free(tmp_ctx);
1815                 return;
1816         }
1817
1818         talloc_free(tmp_ctx);
1819 }
1820
1821 /*
1822   handler for reload_nodes
1823 */
1824 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1825                              TDB_DATA data, void *private_data)
1826 {
1827         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1828
1829         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1830
1831         reload_nodes_file(rec->ctdb);
1832 }
1833
1834
1835 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1836                               struct timeval yt, void *p)
1837 {
1838         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1839
1840         talloc_free(rec->ip_check_disable_ctx);
1841         rec->ip_check_disable_ctx = NULL;
1842 }
1843
1844 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1845                              TDB_DATA data, void *private_data)
1846 {
1847         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1848         uint32_t timeout;
1849
1850         if (rec->ip_check_disable_ctx != NULL) {
1851                 talloc_free(rec->ip_check_disable_ctx);
1852                 rec->ip_check_disable_ctx = NULL;
1853         }
1854
1855         if (data.dsize != sizeof(uint32_t)) {
1856                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1857                                  "expexting %lu\n", (long unsigned)data.dsize,
1858                                  (long unsigned)sizeof(uint32_t)));
1859                 return;
1860         }
1861         if (data.dptr == NULL) {
1862                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1863                 return;
1864         }
1865
1866         timeout = *((uint32_t *)data.dptr);
1867         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1868
1869         rec->ip_check_disable_ctx = talloc_new(rec);
1870         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1871
1872         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1873 }
1874
1875
1876 /*
1877   handler for ip reallocate, just add it to the list of callers and 
1878   handle this later in the monitor_cluster loop so we do not recurse
1879   with other callers to takeover_run()
1880 */
1881 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1882                              TDB_DATA data, void *private_data)
1883 {
1884         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1885         struct ip_reallocate_list *caller;
1886
1887         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1888                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1889                 return;
1890         }
1891
1892         if (rec->ip_reallocate_ctx == NULL) {
1893                 rec->ip_reallocate_ctx = talloc_new(rec);
1894                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1895         }
1896
1897         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1898         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1899
1900         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1901         caller->next = rec->reallocate_callers;
1902         rec->reallocate_callers = caller;
1903
1904         return;
1905 }
1906
1907 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1908 {
1909         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1910         TDB_DATA result;
1911         int32_t ret;
1912         struct ip_reallocate_list *callers;
1913
1914         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1915         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1916         result.dsize = sizeof(int32_t);
1917         result.dptr  = (uint8_t *)&ret;
1918
1919         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1920
1921                 /* Someone that sent srvid==0 does not want a reply */
1922                 if (callers->rd->srvid == 0) {
1923                         continue;
1924                 }
1925                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1926                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1927                                   (unsigned long long)callers->rd->srvid));
1928                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1929                 if (ret != 0) {
1930                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1931                                          "message to %u:%llu\n",
1932                                          (unsigned)callers->rd->pnn,
1933                                          (unsigned long long)callers->rd->srvid));
1934                 }
1935         }
1936
1937         talloc_free(tmp_ctx);
1938         talloc_free(rec->ip_reallocate_ctx);
1939         rec->ip_reallocate_ctx = NULL;
1940         rec->reallocate_callers = NULL;
1941         
1942 }
1943
1944
1945 /*
1946   handler for recovery master elections
1947 */
1948 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1949                              TDB_DATA data, void *private_data)
1950 {
1951         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1952         int ret;
1953         struct election_message *em = (struct election_message *)data.dptr;
1954         TALLOC_CTX *mem_ctx;
1955
1956         /* we got an election packet - update the timeout for the election */
1957         talloc_free(rec->election_timeout);
1958         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1959                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1960                                                 ctdb_election_timeout, rec);
1961
1962         mem_ctx = talloc_new(ctdb);
1963
1964         /* someone called an election. check their election data
1965            and if we disagree and we would rather be the elected node, 
1966            send a new election message to all other nodes
1967          */
1968         if (ctdb_election_win(rec, em)) {
1969                 if (!rec->send_election_te) {
1970                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1971                                                                 timeval_current_ofs(0, 500000),
1972                                                                 election_send_request, rec);
1973                 }
1974                 talloc_free(mem_ctx);
1975                 /*unban_all_nodes(ctdb);*/
1976                 return;
1977         }
1978         
1979         /* we didn't win */
1980         talloc_free(rec->send_election_te);
1981         rec->send_election_te = NULL;
1982
1983         if (ctdb->tunable.verify_recovery_lock != 0) {
1984                 /* release the recmaster lock */
1985                 if (em->pnn != ctdb->pnn &&
1986                     ctdb->recovery_lock_fd != -1) {
1987                         close(ctdb->recovery_lock_fd);
1988                         ctdb->recovery_lock_fd = -1;
1989                         unban_all_nodes(ctdb);
1990                 }
1991         }
1992
1993         /* ok, let that guy become recmaster then */
1994         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1995         if (ret != 0) {
1996                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1997                 talloc_free(mem_ctx);
1998                 return;
1999         }
2000
2001         talloc_free(mem_ctx);
2002         return;
2003 }
2004
2005
2006 /*
2007   force the start of the election process
2008  */
2009 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2010                            struct ctdb_node_map *nodemap)
2011 {
2012         int ret;
2013         struct ctdb_context *ctdb = rec->ctdb;
2014
2015         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2016
2017         /* set all nodes to recovery mode to stop all internode traffic */
2018         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2019         if (ret != 0) {
2020                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2021                 return;
2022         }
2023
2024         talloc_free(rec->election_timeout);
2025         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2026                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2027                                                 ctdb_election_timeout, rec);
2028
2029         ret = send_election_request(rec, pnn, true);
2030         if (ret!=0) {
2031                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2032                 return;
2033         }
2034
2035         /* wait for a few seconds to collect all responses */
2036         ctdb_wait_election(rec);
2037 }
2038
2039
2040
2041 /*
2042   handler for when a node changes its flags
2043 */
2044 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2045                             TDB_DATA data, void *private_data)
2046 {
2047         int ret;
2048         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2049         struct ctdb_node_map *nodemap=NULL;
2050         TALLOC_CTX *tmp_ctx;
2051         uint32_t changed_flags;
2052         int i;
2053         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2054         int disabled_flag_changed;
2055
2056         if (data.dsize != sizeof(*c)) {
2057                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2058                 return;
2059         }
2060
2061         tmp_ctx = talloc_new(ctdb);
2062         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2063
2064         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2065         if (ret != 0) {
2066                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2067                 talloc_free(tmp_ctx);
2068                 return;         
2069         }
2070
2071
2072         for (i=0;i<nodemap->num;i++) {
2073                 if (nodemap->nodes[i].pnn == c->pnn) break;
2074         }
2075
2076         if (i == nodemap->num) {
2077                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2078                 talloc_free(tmp_ctx);
2079                 return;
2080         }
2081
2082         changed_flags = c->old_flags ^ c->new_flags;
2083
2084         if (nodemap->nodes[i].flags != c->new_flags) {
2085                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2086         }
2087
2088         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2089
2090         nodemap->nodes[i].flags = c->new_flags;
2091
2092         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2093                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2094
2095         if (ret == 0) {
2096                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2097                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2098         }
2099         
2100         if (ret == 0 &&
2101             ctdb->recovery_master == ctdb->pnn &&
2102             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2103                 /* Only do the takeover run if the perm disabled or unhealthy
2104                    flags changed since these will cause an ip failover but not
2105                    a recovery.
2106                    If the node became disconnected or banned this will also
2107                    lead to an ip address failover but that is handled 
2108                    during recovery
2109                 */
2110                 if (disabled_flag_changed) {
2111                         rec->need_takeover_run = true;
2112                 }
2113         }
2114
2115         talloc_free(tmp_ctx);
2116 }
2117
2118 /*
2119   handler for when we need to push out flag changes ot all other nodes
2120 */
2121 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2122                             TDB_DATA data, void *private_data)
2123 {
2124         int ret;
2125         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2126         struct ctdb_node_map *nodemap=NULL;
2127         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2128         uint32_t recmaster;
2129         uint32_t *nodes;
2130
2131         /* find the recovery master */
2132         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2133         if (ret != 0) {
2134                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2135                 talloc_free(tmp_ctx);
2136                 return;
2137         }
2138
2139         /* read the node flags from the recmaster */
2140         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2141         if (ret != 0) {
2142                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2143                 talloc_free(tmp_ctx);
2144                 return;
2145         }
2146         if (c->pnn >= nodemap->num) {
2147                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2148                 talloc_free(tmp_ctx);
2149                 return;
2150         }
2151
2152         /* send the flags update to all connected nodes */
2153         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2154
2155         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2156                                       nodes, 0, CONTROL_TIMEOUT(),
2157                                       false, data,
2158                                       NULL, NULL,
2159                                       NULL) != 0) {
2160                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2161
2162                 talloc_free(tmp_ctx);
2163                 return;
2164         }
2165
2166         talloc_free(tmp_ctx);
2167 }
2168
2169
2170 struct verify_recmode_normal_data {
2171         uint32_t count;
2172         enum monitor_result status;
2173 };
2174
2175 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2176 {
2177         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2178
2179
2180         /* one more node has responded with recmode data*/
2181         rmdata->count--;
2182
2183         /* if we failed to get the recmode, then return an error and let
2184            the main loop try again.
2185         */
2186         if (state->state != CTDB_CONTROL_DONE) {
2187                 if (rmdata->status == MONITOR_OK) {
2188                         rmdata->status = MONITOR_FAILED;
2189                 }
2190                 return;
2191         }
2192
2193         /* if we got a response, then the recmode will be stored in the
2194            status field
2195         */
2196         if (state->status != CTDB_RECOVERY_NORMAL) {
2197                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2198                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2199         }
2200
2201         return;
2202 }
2203
2204
2205 /* verify that all nodes are in normal recovery mode */
2206 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2207 {
2208         struct verify_recmode_normal_data *rmdata;
2209         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2210         struct ctdb_client_control_state *state;
2211         enum monitor_result status;
2212         int j;
2213         
2214         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2215         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2216         rmdata->count  = 0;
2217         rmdata->status = MONITOR_OK;
2218
2219         /* loop over all active nodes and send an async getrecmode call to 
2220            them*/
2221         for (j=0; j<nodemap->num; j++) {
2222                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2223                         continue;
2224                 }
2225                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2226                                         CONTROL_TIMEOUT(), 
2227                                         nodemap->nodes[j].pnn);
2228                 if (state == NULL) {
2229                         /* we failed to send the control, treat this as 
2230                            an error and try again next iteration
2231                         */                      
2232                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2233                         talloc_free(mem_ctx);
2234                         return MONITOR_FAILED;
2235                 }
2236
2237                 /* set up the callback functions */
2238                 state->async.fn = verify_recmode_normal_callback;
2239                 state->async.private_data = rmdata;
2240
2241                 /* one more control to wait for to complete */
2242                 rmdata->count++;
2243         }
2244
2245
2246         /* now wait for up to the maximum number of seconds allowed
2247            or until all nodes we expect a response from has replied
2248         */
2249         while (rmdata->count > 0) {
2250                 event_loop_once(ctdb->ev);
2251         }
2252
2253         status = rmdata->status;
2254         talloc_free(mem_ctx);
2255         return status;
2256 }
2257
2258
2259 struct verify_recmaster_data {
2260         struct ctdb_recoverd *rec;
2261         uint32_t count;
2262         uint32_t pnn;
2263         enum monitor_result status;
2264 };
2265
2266 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2267 {
2268         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2269
2270
2271         /* one more node has responded with recmaster data*/
2272         rmdata->count--;
2273
2274         /* if we failed to get the recmaster, then return an error and let
2275            the main loop try again.
2276         */
2277         if (state->state != CTDB_CONTROL_DONE) {
2278                 if (rmdata->status == MONITOR_OK) {
2279                         rmdata->status = MONITOR_FAILED;
2280                 }
2281                 return;
2282         }
2283
2284         /* if we got a response, then the recmaster will be stored in the
2285            status field
2286         */
2287         if (state->status != rmdata->pnn) {
2288                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2289                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2290                 rmdata->status = MONITOR_ELECTION_NEEDED;
2291         }
2292
2293         return;
2294 }
2295
2296
2297 /* verify that all nodes agree that we are the recmaster */
2298 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2299 {
2300         struct ctdb_context *ctdb = rec->ctdb;
2301         struct verify_recmaster_data *rmdata;
2302         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2303         struct ctdb_client_control_state *state;
2304         enum monitor_result status;
2305         int j;
2306         
2307         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2308         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2309         rmdata->rec    = rec;
2310         rmdata->count  = 0;
2311         rmdata->pnn    = pnn;
2312         rmdata->status = MONITOR_OK;
2313
2314         /* loop over all active nodes and send an async getrecmaster call to 
2315            them*/
2316         for (j=0; j<nodemap->num; j++) {
2317                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2318                         continue;
2319                 }
2320                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2321                                         CONTROL_TIMEOUT(),
2322                                         nodemap->nodes[j].pnn);
2323                 if (state == NULL) {
2324                         /* we failed to send the control, treat this as 
2325                            an error and try again next iteration
2326                         */                      
2327                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2328                         talloc_free(mem_ctx);
2329                         return MONITOR_FAILED;
2330                 }
2331
2332                 /* set up the callback functions */
2333                 state->async.fn = verify_recmaster_callback;
2334                 state->async.private_data = rmdata;
2335
2336                 /* one more control to wait for to complete */
2337                 rmdata->count++;
2338         }
2339
2340
2341         /* now wait for up to the maximum number of seconds allowed
2342            or until all nodes we expect a response from has replied
2343         */
2344         while (rmdata->count > 0) {
2345                 event_loop_once(ctdb->ev);
2346         }
2347
2348         status = rmdata->status;
2349         talloc_free(mem_ctx);
2350         return status;
2351 }
2352
2353
2354 /* called to check that the allocation of public ip addresses is ok.
2355 */
2356 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2357 {
2358         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2359         struct ctdb_all_public_ips *ips = NULL;
2360         struct ctdb_uptime *uptime1 = NULL;
2361         struct ctdb_uptime *uptime2 = NULL;
2362         int ret, j;
2363
2364         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2365                                 CTDB_CURRENT_NODE, &uptime1);
2366         if (ret != 0) {
2367                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2368                 talloc_free(mem_ctx);
2369                 return -1;
2370         }
2371
2372         /* read the ip allocation from the local node */
2373         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2374         if (ret != 0) {
2375                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2376                 talloc_free(mem_ctx);
2377                 return -1;
2378         }
2379
2380         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2381                                 CTDB_CURRENT_NODE, &uptime2);
2382         if (ret != 0) {
2383                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2384                 talloc_free(mem_ctx);
2385                 return -1;
2386         }
2387
2388         /* skip the check if the startrecovery time has changed */
2389         if (timeval_compare(&uptime1->last_recovery_started,
2390                             &uptime2->last_recovery_started) != 0) {
2391                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2392                 talloc_free(mem_ctx);
2393                 return 0;
2394         }
2395
2396         /* skip the check if the endrecovery time has changed */
2397         if (timeval_compare(&uptime1->last_recovery_finished,
2398                             &uptime2->last_recovery_finished) != 0) {
2399                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2400                 talloc_free(mem_ctx);
2401                 return 0;
2402         }
2403
2404         /* skip the check if we have started but not finished recovery */
2405         if (timeval_compare(&uptime1->last_recovery_finished,
2406                             &uptime1->last_recovery_started) != 1) {
2407                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2408                 talloc_free(mem_ctx);
2409
2410                 return 0;
2411         }
2412
2413         /* verify that we have the ip addresses we should have
2414            and we dont have ones we shouldnt have.
2415            if we find an inconsistency we set recmode to
2416            active on the local node and wait for the recmaster
2417            to do a full blown recovery
2418         */
2419         for (j=0; j<ips->num; j++) {
2420                 if (ips->ips[j].pnn == pnn) {
2421                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2422                                 struct takeover_run_reply rd;
2423                                 TDB_DATA data;
2424
2425                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2426                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2427
2428                                 rd.pnn   = ctdb->pnn;
2429                                 rd.srvid = 0;
2430                                 data.dptr = (uint8_t *)&rd;
2431                                 data.dsize = sizeof(rd);
2432
2433                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2434                                 if (ret != 0) {
2435                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2436                                 }
2437                         }
2438                 } else {
2439                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2440                                 struct takeover_run_reply rd;
2441                                 TDB_DATA data;
2442
2443                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2444                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2445
2446                                 rd.pnn   = ctdb->pnn;
2447                                 rd.srvid = 0;
2448                                 data.dptr = (uint8_t *)&rd;
2449                                 data.dsize = sizeof(rd);
2450
2451                                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2452                                 if (ret != 0) {
2453                                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2454                                 }
2455                         }
2456                 }
2457         }
2458
2459         talloc_free(mem_ctx);
2460         return 0;
2461 }
2462
2463
2464 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2465 {
2466         struct ctdb_node_map **remote_nodemaps = callback_data;
2467
2468         if (node_pnn >= ctdb->num_nodes) {
2469                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2470                 return;
2471         }
2472
2473         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2474
2475 }
2476
2477 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2478         struct ctdb_node_map *nodemap,
2479         struct ctdb_node_map **remote_nodemaps)
2480 {
2481         uint32_t *nodes;
2482
2483         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2484         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2485                                         nodes, 0,
2486                                         CONTROL_TIMEOUT(), false, tdb_null,
2487                                         async_getnodemap_callback,
2488                                         NULL,
2489                                         remote_nodemaps) != 0) {
2490                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2491
2492                 return -1;
2493         }
2494
2495         return 0;
2496 }
2497
2498 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2499 struct ctdb_check_reclock_state {
2500         struct ctdb_context *ctdb;
2501         struct timeval start_time;
2502         int fd[2];
2503         pid_t child;
2504         struct timed_event *te;
2505         struct fd_event *fde;
2506         enum reclock_child_status status;
2507 };
2508
2509 /* when we free the reclock state we must kill any child process.
2510 */
2511 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2512 {
2513         struct ctdb_context *ctdb = state->ctdb;
2514
2515         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2516
2517         if (state->fd[0] != -1) {
2518                 close(state->fd[0]);
2519                 state->fd[0] = -1;
2520         }
2521         if (state->fd[1] != -1) {
2522                 close(state->fd[1]);
2523                 state->fd[1] = -1;
2524         }
2525         kill(state->child, SIGKILL);
2526         return 0;
2527 }
2528
2529 /*
2530   called if our check_reclock child times out. this would happen if
2531   i/o to the reclock file blocks.
2532  */
2533 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2534                                          struct timeval t, void *private_data)
2535 {
2536         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2537                                            struct ctdb_check_reclock_state);
2538
2539         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2540         state->status = RECLOCK_TIMEOUT;
2541 }
2542
2543 /* this is called when the child process has completed checking the reclock
2544    file and has written data back to us through the pipe.
2545 */
2546 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2547                              uint16_t flags, void *private_data)
2548 {
2549         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2550                                              struct ctdb_check_reclock_state);
2551         char c = 0;
2552         int ret;
2553
2554         /* we got a response from our child process so we can abort the
2555            timeout.
2556         */
2557         talloc_free(state->te);
2558         state->te = NULL;
2559
2560         ret = read(state->fd[0], &c, 1);
2561         if (ret != 1 || c != RECLOCK_OK) {
2562                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2563                 state->status = RECLOCK_FAILED;
2564
2565                 return;
2566         }
2567
2568         state->status = RECLOCK_OK;
2569         return;
2570 }
2571
2572 static int check_recovery_lock(struct ctdb_context *ctdb)
2573 {
2574         int ret;
2575         struct ctdb_check_reclock_state *state;
2576         pid_t parent = getpid();
2577
2578         if (ctdb->recovery_lock_fd == -1) {
2579                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2580                 return -1;
2581         }
2582
2583         state = talloc(ctdb, struct ctdb_check_reclock_state);
2584         CTDB_NO_MEMORY(ctdb, state);
2585
2586         state->ctdb = ctdb;
2587         state->start_time = timeval_current();
2588         state->status = RECLOCK_CHECKING;
2589         state->fd[0] = -1;
2590         state->fd[1] = -1;
2591
2592         ret = pipe(state->fd);
2593         if (ret != 0) {
2594                 talloc_free(state);
2595                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2596                 return -1;
2597         }
2598
2599         state->child = fork();
2600         if (state->child == (pid_t)-1) {
2601                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2602                 close(state->fd[0]);
2603                 state->fd[0] = -1;
2604                 close(state->fd[1]);
2605                 state->fd[1] = -1;
2606                 talloc_free(state);
2607                 return -1;
2608         }
2609
2610         if (state->child == 0) {
2611                 char cc = RECLOCK_OK;
2612                 close(state->fd[0]);
2613                 state->fd[0] = -1;
2614
2615                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2616                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2617                         cc = RECLOCK_FAILED;
2618                 }
2619
2620                 write(state->fd[1], &cc, 1);
2621                 /* make sure we die when our parent dies */
2622                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2623                         sleep(5);
2624                         write(state->fd[1], &cc, 1);
2625                 }
2626                 _exit(0);
2627         }
2628         close(state->fd[1]);
2629         state->fd[1] = -1;
2630         set_close_on_exec(state->fd[0]);
2631
2632         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2633
2634         talloc_set_destructor(state, check_reclock_destructor);
2635
2636         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2637                                     ctdb_check_reclock_timeout, state);
2638         if (state->te == NULL) {
2639                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2640                 talloc_free(state);
2641                 return -1;
2642         }
2643
2644         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2645                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2646                                 reclock_child_handler,
2647                                 (void *)state);
2648
2649         if (state->fde == NULL) {
2650                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2651                 talloc_free(state);
2652                 return -1;
2653         }
2654
2655         while (state->status == RECLOCK_CHECKING) {
2656                 event_loop_once(ctdb->ev);
2657         }
2658
2659         if (state->status == RECLOCK_FAILED) {
2660                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2661                 close(ctdb->recovery_lock_fd);
2662                 ctdb->recovery_lock_fd = -1;
2663                 talloc_free(state);
2664                 return -1;
2665         }
2666
2667         talloc_free(state);
2668         return 0;
2669 }
2670
2671 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2672 {
2673         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2674         const char *reclockfile;
2675
2676         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2677                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2678                 talloc_free(tmp_ctx);
2679                 return -1;      
2680         }
2681
2682         if (reclockfile == NULL) {
2683                 if (ctdb->recovery_lock_file != NULL) {
2684                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2685                         talloc_free(ctdb->recovery_lock_file);
2686                         ctdb->recovery_lock_file = NULL;
2687                         if (ctdb->recovery_lock_fd != -1) {
2688                                 close(ctdb->recovery_lock_fd);
2689                                 ctdb->recovery_lock_fd = -1;
2690                         }
2691                 }
2692                 ctdb->tunable.verify_recovery_lock = 0;
2693                 talloc_free(tmp_ctx);
2694                 return 0;
2695         }
2696
2697         if (ctdb->recovery_lock_file == NULL) {
2698                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2699                 if (ctdb->recovery_lock_fd != -1) {
2700                         close(ctdb->recovery_lock_fd);
2701                         ctdb->recovery_lock_fd = -1;
2702                 }
2703                 talloc_free(tmp_ctx);
2704                 return 0;
2705         }
2706
2707
2708         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2709                 talloc_free(tmp_ctx);
2710                 return 0;
2711         }
2712
2713         talloc_free(ctdb->recovery_lock_file);
2714         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2715         ctdb->tunable.verify_recovery_lock = 0;
2716         if (ctdb->recovery_lock_fd != -1) {
2717                 close(ctdb->recovery_lock_fd);
2718                 ctdb->recovery_lock_fd = -1;
2719         }
2720
2721         talloc_free(tmp_ctx);
2722         return 0;
2723 }
2724                 
2725 /*
2726   the main monitoring loop
2727  */
2728 static void monitor_cluster(struct ctdb_context *ctdb)
2729 {
2730         uint32_t pnn;
2731         TALLOC_CTX *mem_ctx=NULL;
2732         struct ctdb_node_map *nodemap=NULL;
2733         struct ctdb_node_map *recmaster_nodemap=NULL;
2734         struct ctdb_node_map **remote_nodemaps=NULL;
2735         struct ctdb_vnn_map *vnnmap=NULL;
2736         struct ctdb_vnn_map *remote_vnnmap=NULL;
2737         int32_t debug_level;
2738         int i, j, ret;
2739         struct ctdb_recoverd *rec;
2740
2741         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2742
2743         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2744         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2745
2746         rec->ctdb = ctdb;
2747
2748         rec->priority_time = timeval_current();
2749
2750         /* register a message port for sending memory dumps */
2751         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2752
2753         /* register a message port for recovery elections */
2754         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2755
2756         /* when nodes are disabled/enabled */
2757         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2758
2759         /* when we are asked to puch out a flag change */
2760         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2761
2762         /* register a message port for vacuum fetch */
2763         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2764
2765         /* register a message port for reloadnodes  */
2766         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2767
2768         /* register a message port for performing a takeover run */
2769         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2770
2771         /* register a message port for disabling the ip check for a short while */
2772         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2773
2774 again:
2775         if (mem_ctx) {
2776                 talloc_free(mem_ctx);
2777                 mem_ctx = NULL;
2778         }
2779         mem_ctx = talloc_new(ctdb);
2780         if (!mem_ctx) {
2781                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2782                 exit(-1);
2783         }
2784
2785         /* we only check for recovery once every second */
2786         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2787
2788         /* verify that the main daemon is still running */
2789         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2790                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2791                 exit(-1);
2792         }
2793
2794         /* ping the local daemon to tell it we are alive */
2795         ctdb_ctrl_recd_ping(ctdb);
2796
2797         if (rec->election_timeout) {
2798                 /* an election is in progress */
2799                 goto again;
2800         }
2801
2802         /* read the debug level from the parent and update locally */
2803         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2804         if (ret !=0) {
2805                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2806                 goto again;
2807         }
2808         LogLevel = debug_level;
2809
2810
2811         /* We must check if we need to ban a node here but we want to do this
2812            as early as possible so we dont wait until we have pulled the node
2813            map from the local node. thats why we have the hardcoded value 20
2814         */
2815         for (i=0; i<ctdb->num_nodes; i++) {
2816                 struct ctdb_banning_state *ban_state;
2817
2818                 if (ctdb->nodes[i]->ban_state == NULL) {
2819                         continue;
2820                 }
2821                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2822                 if (ban_state->count < 20) {
2823                         continue;
2824                 }
2825                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2826                         ctdb->nodes[i]->pnn, ban_state->count,
2827                         ctdb->tunable.recovery_ban_period));
2828                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2829                 ban_state->count = 0;
2830         }
2831
2832         /* get relevant tunables */
2833         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2834         if (ret != 0) {
2835                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2836                 goto again;
2837         }
2838
2839         /* get the current recovery lock file from the server */
2840         if (update_recovery_lock_file(ctdb) != 0) {
2841                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2842                 goto again;
2843         }
2844
2845         /* Make sure that if recovery lock verification becomes disabled when
2846            we close the file
2847         */
2848         if (ctdb->tunable.verify_recovery_lock == 0) {
2849                 if (ctdb->recovery_lock_fd != -1) {
2850                         close(ctdb->recovery_lock_fd);
2851                         ctdb->recovery_lock_fd = -1;
2852                 }
2853         }
2854
2855         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2856         if (pnn == (uint32_t)-1) {
2857                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2858                 goto again;
2859         }
2860
2861         /* get the vnnmap */
2862         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2863         if (ret != 0) {
2864                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2865                 goto again;
2866         }
2867
2868
2869         /* get number of nodes */
2870         if (rec->nodemap) {
2871                 talloc_free(rec->nodemap);
2872                 rec->nodemap = NULL;
2873                 nodemap=NULL;
2874         }
2875         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2876         if (ret != 0) {
2877                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2878                 goto again;
2879         }
2880         nodemap = rec->nodemap;
2881
2882         /* check which node is the recovery master */
2883         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2884         if (ret != 0) {
2885                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2886                 goto again;
2887         }
2888
2889         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2890         if (rec->recmaster != pnn) {
2891                 if (rec->ip_reallocate_ctx != NULL) {
2892                         talloc_free(rec->ip_reallocate_ctx);
2893                         rec->ip_reallocate_ctx = NULL;
2894                         rec->reallocate_callers = NULL;
2895                 }
2896         }
2897         /* if there are takeovers requested, perform it and notify the waiters */
2898         if (rec->reallocate_callers) {
2899                 process_ipreallocate_requests(ctdb, rec);
2900         }
2901
2902         if (rec->recmaster == (uint32_t)-1) {
2903                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2904                 force_election(rec, pnn, nodemap);
2905                 goto again;
2906         }
2907
2908
2909         /* if the local daemon is STOPPED, we verify that the databases are
2910            also frozen and thet the recmode is set to active 
2911         */
2912         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2913                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2914                 if (ret != 0) {
2915                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2916                 }
2917                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2918                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2919
2920                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2921                         if (ret != 0) {
2922                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2923                                 goto again;
2924                         }
2925                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2926                         if (ret != 0) {
2927                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2928
2929                                 goto again;
2930                         }
2931                         goto again;
2932                 }
2933         }
2934         /* If the local node is stopped, verify we are not the recmaster 
2935            and yield this role if so
2936         */
2937         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2938                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2939                 force_election(rec, pnn, nodemap);
2940                 goto again;
2941         }
2942         
2943         /* check that we (recovery daemon) and the local ctdb daemon
2944            agrees on whether we are banned or not
2945         */
2946 //qqq
2947
2948         /* remember our own node flags */
2949         rec->node_flags = nodemap->nodes[pnn].flags;
2950
2951         /* count how many active nodes there are */
2952         rec->num_active    = 0;
2953         rec->num_connected = 0;
2954         for (i=0; i<nodemap->num; i++) {
2955                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2956                         rec->num_active++;
2957                 }
2958                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2959                         rec->num_connected++;
2960                 }
2961         }
2962
2963
2964         /* verify that the recmaster node is still active */
2965         for (j=0; j<nodemap->num; j++) {
2966                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2967                         break;
2968                 }
2969         }
2970
2971         if (j == nodemap->num) {
2972                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2973                 force_election(rec, pnn, nodemap);
2974                 goto again;
2975         }
2976
2977         /* if recovery master is disconnected we must elect a new recmaster */
2978         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2979                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2980                 force_election(rec, pnn, nodemap);
2981                 goto again;
2982         }
2983
2984         /* grap the nodemap from the recovery master to check if it is banned */
2985         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2986                                    mem_ctx, &recmaster_nodemap);
2987         if (ret != 0) {
2988                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2989                           nodemap->nodes[j].pnn));
2990                 goto again;
2991         }
2992
2993
2994         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2995                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2996                 force_election(rec, pnn, nodemap);
2997                 goto again;
2998         }
2999
3000
3001         /* verify that we have all ip addresses we should have and we dont
3002          * have addresses we shouldnt have.
3003          */ 
3004         if (ctdb->do_checkpublicip) {
3005                 if (rec->ip_check_disable_ctx == NULL) {
3006                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
3007                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3008                         }
3009                 }
3010         }
3011
3012
3013         /* if we are not the recmaster then we do not need to check
3014            if recovery is needed
3015          */
3016         if (pnn != rec->recmaster) {
3017                 goto again;
3018         }
3019
3020
3021         /* ensure our local copies of flags are right */
3022         ret = update_local_flags(rec, nodemap);
3023         if (ret == MONITOR_ELECTION_NEEDED) {
3024                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3025                 force_election(rec, pnn, nodemap);
3026                 goto again;
3027         }
3028         if (ret != MONITOR_OK) {
3029                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3030                 goto again;
3031         }
3032
3033         /* update the list of public ips that a node can handle for
3034            all connected nodes
3035         */
3036         if (ctdb->num_nodes != nodemap->num) {
3037                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3038                 reload_nodes_file(ctdb);
3039                 goto again;
3040         }
3041         for (j=0; j<nodemap->num; j++) {
3042                 /* release any existing data */
3043                 if (ctdb->nodes[j]->public_ips) {
3044                         talloc_free(ctdb->nodes[j]->public_ips);
3045                         ctdb->nodes[j]->public_ips = NULL;
3046                 }
3047
3048                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3049                         continue;
3050                 }
3051
3052                 /* grab a new shiny list of public ips from the node */
3053                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
3054                         ctdb->nodes[j]->pnn, 
3055                         ctdb->nodes,
3056                         &ctdb->nodes[j]->public_ips)) {
3057                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
3058                                 ctdb->nodes[j]->pnn));
3059                         goto again;
3060                 }
3061         }
3062
3063
3064         /* verify that all active nodes agree that we are the recmaster */
3065         switch (verify_recmaster(rec, nodemap, pnn)) {
3066         case MONITOR_RECOVERY_NEEDED:
3067                 /* can not happen */
3068                 goto again;
3069         case MONITOR_ELECTION_NEEDED:
3070                 force_election(rec, pnn, nodemap);
3071                 goto again;
3072         case MONITOR_OK:
3073                 break;
3074         case MONITOR_FAILED:
3075                 goto again;
3076         }
3077
3078
3079         if (rec->need_recovery) {
3080                 /* a previous recovery didn't finish */
3081                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3082                 goto again;             
3083         }
3084
3085         /* verify that all active nodes are in normal mode 
3086            and not in recovery mode 
3087         */
3088         switch (verify_recmode(ctdb, nodemap)) {
3089         case MONITOR_RECOVERY_NEEDED:
3090                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3091                 goto again;
3092         case MONITOR_FAILED:
3093                 goto again;
3094         case MONITOR_ELECTION_NEEDED:
3095                 /* can not happen */
3096         case MONITOR_OK:
3097                 break;
3098         }
3099
3100
3101         if (ctdb->tunable.verify_recovery_lock != 0) {
3102                 /* we should have the reclock - check its not stale */
3103                 ret = check_recovery_lock(ctdb);
3104                 if (ret != 0) {
3105                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3106                         ctdb_set_culprit(rec, ctdb->pnn);
3107                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3108                         goto again;
3109                 }
3110         }
3111
3112         /* get the nodemap for all active remote nodes
3113          */
3114         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3115         if (remote_nodemaps == NULL) {
3116                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3117                 goto again;
3118         }
3119         for(i=0; i<nodemap->num; i++) {
3120                 remote_nodemaps[i] = NULL;
3121         }
3122         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3123                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3124                 goto again;
3125         } 
3126
3127         /* verify that all other nodes have the same nodemap as we have
3128         */
3129         for (j=0; j<nodemap->num; j++) {
3130                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3131                         continue;
3132                 }
3133
3134                 if (remote_nodemaps[j] == NULL) {
3135                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3136                         ctdb_set_culprit(rec, j);
3137
3138                         goto again;
3139                 }
3140
3141                 /* if the nodes disagree on how many nodes there are
3142                    then this is a good reason to try recovery
3143                  */
3144                 if (remote_nodemaps[j]->num != nodemap->num) {
3145                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3146                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3147                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3148                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3149                         goto again;
3150                 }
3151
3152                 /* if the nodes disagree on which nodes exist and are
3153                    active, then that is also a good reason to do recovery
3154                  */
3155                 for (i=0;i<nodemap->num;i++) {
3156                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3157                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3158                                           nodemap->nodes[j].pnn, i, 
3159                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3160                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3161                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3162                                             vnnmap);
3163                                 goto again;
3164                         }
3165                 }
3166
3167                 /* verify the flags are consistent
3168                 */
3169                 for (i=0; i<nodemap->num; i++) {
3170                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3171                                 continue;
3172                         }
3173                         
3174                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3175                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3176                                   nodemap->nodes[j].pnn, 
3177                                   nodemap->nodes[i].pnn, 
3178                                   remote_nodemaps[j]->nodes[i].flags,
3179                                   nodemap->nodes[j].flags));
3180                                 if (i == j) {
3181                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3182                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3183                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3184                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3185                                                     vnnmap);
3186                                         goto again;
3187                                 } else {
3188                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3189                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3190                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3191                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3192                                                     vnnmap);
3193                                         goto again;
3194                                 }
3195                         }
3196                 }
3197         }
3198
3199
3200         /* there better be the same number of lmasters in the vnn map
3201            as there are active nodes or we will have to do a recovery
3202          */
3203         if (vnnmap->size != rec->num_active) {
3204                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3205                           vnnmap->size, rec->num_active));
3206                 ctdb_set_culprit(rec, ctdb->pnn);
3207                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3208                 goto again;
3209         }
3210
3211         /* verify that all active nodes in the nodemap also exist in 
3212            the vnnmap.
3213          */
3214         for (j=0; j<nodemap->num; j++) {
3215                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3216                         continue;
3217                 }
3218                 if (nodemap->nodes[j].pnn == pnn) {
3219                         continue;
3220                 }
3221
3222                 for (i=0; i<vnnmap->size; i++) {
3223                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3224                                 break;
3225                         }
3226                 }
3227                 if (i == vnnmap->size) {
3228                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3229                                   nodemap->nodes[j].pnn));
3230                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3231                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3232                         goto again;
3233                 }
3234         }
3235
3236         
3237         /* verify that all other nodes have the same vnnmap
3238            and are from the same generation
3239          */
3240         for (j=0; j<nodemap->num; j++) {
3241                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3242                         continue;
3243                 }
3244                 if (nodemap->nodes[j].pnn == pnn) {
3245                         continue;
3246                 }
3247
3248                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3249                                           mem_ctx, &remote_vnnmap);
3250                 if (ret != 0) {
3251                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3252                                   nodemap->nodes[j].pnn));
3253                         goto again;
3254                 }
3255
3256                 /* verify the vnnmap generation is the same */
3257                 if (vnnmap->generation != remote_vnnmap->generation) {
3258                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3259                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3260                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3261                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3262                         goto again;
3263                 }
3264
3265                 /* verify the vnnmap size is the same */
3266                 if (vnnmap->size != remote_vnnmap->size) {
3267                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3268                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3269                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3270                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3271                         goto again;
3272                 }
3273
3274                 /* verify the vnnmap is the same */
3275                 for (i=0;i<vnnmap->size;i++) {
3276                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3277                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3278                                           nodemap->nodes[j].pnn));
3279                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3280                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3281                                             vnnmap);
3282                                 goto again;
3283                         }
3284                 }
3285         }
3286
3287         /* we might need to change who has what IP assigned */
3288         if (rec->need_takeover_run) {
3289                 rec->need_takeover_run = false;
3290
3291                 /* execute the "startrecovery" event script on all nodes */
3292                 ret = run_startrecovery_eventscript(rec, nodemap);
3293                 if (ret!=0) {
3294                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3295                         ctdb_set_culprit(rec, ctdb->pnn);
3296                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3297                 }
3298
3299                 ret = ctdb_takeover_run(ctdb, nodemap);
3300                 if (ret != 0) {
3301                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3302                         ctdb_set_culprit(rec, ctdb->pnn);
3303                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3304                 }
3305
3306                 /* execute the "recovered" event script on all nodes */
3307                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3308 #if 0
3309 // we cant check whether the event completed successfully
3310 // since this script WILL fail if the node is in recovery mode
3311 // and if that race happens, the code here would just cause a second
3312 // cascading recovery.
3313                 if (ret!=0) {
3314                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3315                         ctdb_set_culprit(rec, ctdb->pnn);
3316                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3317                 }
3318 #endif
3319         }
3320
3321
3322         goto again;
3323
3324 }
3325
3326 /*
3327   event handler for when the main ctdbd dies
3328  */
3329 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3330                                  uint16_t flags, void *private_data)
3331 {
3332         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3333         _exit(1);
3334 }
3335
3336 /*
3337   called regularly to verify that the recovery daemon is still running
3338  */
3339 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3340                               struct timeval yt, void *p)
3341 {
3342         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3343
3344         if (kill(ctdb->recoverd_pid, 0) != 0) {
3345                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3346
3347                 ctdb_stop_recoverd(ctdb);
3348                 ctdb_stop_keepalive(ctdb);
3349                 ctdb_stop_monitoring(ctdb);
3350                 ctdb_release_all_ips(ctdb);
3351                 if (ctdb->methods != NULL) {
3352                         ctdb->methods->shutdown(ctdb);
3353                 }
3354                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3355
3356                 exit(10);       
3357         }
3358
3359         event_add_timed(ctdb->ev, ctdb, 
3360                         timeval_current_ofs(30, 0),
3361                         ctdb_check_recd, ctdb);
3362 }
3363
3364 static void recd_sig_child_handler(struct event_context *ev,
3365         struct signal_event *se, int signum, int count,
3366         void *dont_care, 
3367         void *private_data)
3368 {
3369 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3370         int status;
3371         pid_t pid = -1;
3372
3373         while (pid != 0) {
3374                 pid = waitpid(-1, &status, WNOHANG);
3375                 if (pid == -1) {
3376                         if (errno != ECHILD) {
3377                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3378                         }
3379                         return;
3380                 }
3381                 if (pid > 0) {
3382                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3383                 }
3384         }
3385 }
3386
3387 /*
3388   startup the recovery daemon as a child of the main ctdb daemon
3389  */
3390 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3391 {
3392         int fd[2];
3393         struct signal_event *se;
3394
3395         if (pipe(fd) != 0) {
3396                 return -1;
3397         }
3398
3399         ctdb->ctdbd_pid = getpid();
3400
3401         ctdb->recoverd_pid = fork();
3402         if (ctdb->recoverd_pid == -1) {
3403                 return -1;
3404         }
3405         
3406         if (ctdb->recoverd_pid != 0) {
3407                 close(fd[0]);
3408                 event_add_timed(ctdb->ev, ctdb, 
3409                                 timeval_current_ofs(30, 0),
3410                                 ctdb_check_recd, ctdb);
3411                 return 0;
3412         }
3413
3414         close(fd[1]);
3415
3416         srandom(getpid() ^ time(NULL));
3417
3418         if (switch_from_server_to_client(ctdb) != 0) {
3419                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3420                 exit(1);
3421         }
3422
3423         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3424
3425         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3426                      ctdb_recoverd_parent, &fd[0]);     
3427
3428         /* set up a handler to pick up sigchld */
3429         se = event_add_signal(ctdb->ev, ctdb,
3430                                      SIGCHLD, 0,
3431                                      recd_sig_child_handler,
3432                                      ctdb);
3433         if (se == NULL) {
3434                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3435                 exit(1);
3436         }
3437
3438         monitor_cluster(ctdb);
3439
3440         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3441         return -1;
3442 }
3443
3444 /*
3445   shutdown the recovery daemon
3446  */
3447 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3448 {
3449         if (ctdb->recoverd_pid == 0) {
3450                 return;
3451         }
3452
3453         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3454         kill(ctdb->recoverd_pid, SIGTERM);
3455 }