recoverd: main_loop() should not verify local IPs if node is stopped
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* most recent reload all ips request we need to perform during the 
34    next monitoring loop
35 */
36 struct reloadips_all_reply *reload_all_ips_request = NULL;
37
38 /* list of "ctdb ipreallocate" processes to call back when we have
39    finished the takeover run.
40 */
41 struct ip_reallocate_list {
42         struct ip_reallocate_list *next;
43         struct rd_memdump_reply *rd;
44 };
45
46 struct ctdb_banning_state {
47         uint32_t count;
48         struct timeval last_reported_time;
49 };
50
51 /*
52   private state of recovery daemon
53  */
54 struct ctdb_recoverd {
55         struct ctdb_context *ctdb;
56         uint32_t recmaster;
57         uint32_t num_active;
58         uint32_t num_connected;
59         uint32_t last_culprit_node;
60         struct ctdb_node_map *nodemap;
61         struct timeval priority_time;
62         bool need_takeover_run;
63         bool need_recovery;
64         uint32_t node_flags;
65         struct timed_event *send_election_te;
66         struct timed_event *election_timeout;
67         struct vacuum_info *vacuum_info;
68         TALLOC_CTX *ip_reallocate_ctx;
69         struct ip_reallocate_list *reallocate_callers;
70         TALLOC_CTX *ip_check_disable_ctx;
71         struct ctdb_control_get_ifaces *ifaces;
72         TALLOC_CTX *deferred_rebalance_ctx;
73 };
74
75 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
76 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
77
78 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
79
80 /*
81   ban a node for a period of time
82  */
83 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
84 {
85         int ret;
86         struct ctdb_context *ctdb = rec->ctdb;
87         struct ctdb_ban_time bantime;
88        
89         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
90
91         if (!ctdb_validate_pnn(ctdb, pnn)) {
92                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
93                 return;
94         }
95
96         bantime.pnn  = pnn;
97         bantime.time = ban_time;
98
99         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
100         if (ret != 0) {
101                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
102                 return;
103         }
104
105 }
106
107 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
108
109
110 /*
111   run the "recovered" eventscript on all nodes
112  */
113 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
114 {
115         TALLOC_CTX *tmp_ctx;
116         uint32_t *nodes;
117
118         tmp_ctx = talloc_new(ctdb);
119         CTDB_NO_MEMORY(ctdb, tmp_ctx);
120
121         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
122         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
123                                         nodes, 0,
124                                         CONTROL_TIMEOUT(), false, tdb_null,
125                                         NULL, NULL,
126                                         NULL) != 0) {
127                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
128
129                 talloc_free(tmp_ctx);
130                 return -1;
131         }
132
133         talloc_free(tmp_ctx);
134         return 0;
135 }
136
137 /*
138   remember the trouble maker
139  */
140 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
141 {
142         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
143         struct ctdb_banning_state *ban_state;
144
145         if (culprit > ctdb->num_nodes) {
146                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
147                 return;
148         }
149
150         if (ctdb->nodes[culprit]->ban_state == NULL) {
151                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
152                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
153
154                 
155         }
156         ban_state = ctdb->nodes[culprit]->ban_state;
157         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
158                 /* this was the first time in a long while this node
159                    misbehaved so we will forgive any old transgressions.
160                 */
161                 ban_state->count = 0;
162         }
163
164         ban_state->count += count;
165         ban_state->last_reported_time = timeval_current();
166         rec->last_culprit_node = culprit;
167 }
168
169 /*
170   remember the trouble maker
171  */
172 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
173 {
174         ctdb_set_culprit_count(rec, culprit, 1);
175 }
176
177
178 /* this callback is called for every node that failed to execute the
179    start recovery event
180 */
181 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
182 {
183         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
184
185         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
186
187         ctdb_set_culprit(rec, node_pnn);
188 }
189
190 /*
191   run the "startrecovery" eventscript on all nodes
192  */
193 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
194 {
195         TALLOC_CTX *tmp_ctx;
196         uint32_t *nodes;
197         struct ctdb_context *ctdb = rec->ctdb;
198
199         tmp_ctx = talloc_new(ctdb);
200         CTDB_NO_MEMORY(ctdb, tmp_ctx);
201
202         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
203         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
204                                         nodes, 0,
205                                         CONTROL_TIMEOUT(), false, tdb_null,
206                                         NULL,
207                                         startrecovery_fail_callback,
208                                         rec) != 0) {
209                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
210                 talloc_free(tmp_ctx);
211                 return -1;
212         }
213
214         talloc_free(tmp_ctx);
215         return 0;
216 }
217
218 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
219 {
220         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
221                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
222                 return;
223         }
224         if (node_pnn < ctdb->num_nodes) {
225                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
226         }
227
228         if (node_pnn == ctdb->pnn) {
229                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
230         }
231 }
232
233 /*
234   update the node capabilities for all connected nodes
235  */
236 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
237 {
238         uint32_t *nodes;
239         TALLOC_CTX *tmp_ctx;
240
241         tmp_ctx = talloc_new(ctdb);
242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
243
244         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
245         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
246                                         nodes, 0,
247                                         CONTROL_TIMEOUT(),
248                                         false, tdb_null,
249                                         async_getcap_callback, NULL,
250                                         NULL) != 0) {
251                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
252                 talloc_free(tmp_ctx);
253                 return -1;
254         }
255
256         talloc_free(tmp_ctx);
257         return 0;
258 }
259
260 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
261 {
262         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
263
264         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
265         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
266 }
267
268 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
269 {
270         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
271
272         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
273         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
274 }
275
276 /*
277   change recovery mode on all nodes
278  */
279 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
280 {
281         TDB_DATA data;
282         uint32_t *nodes;
283         TALLOC_CTX *tmp_ctx;
284
285         tmp_ctx = talloc_new(ctdb);
286         CTDB_NO_MEMORY(ctdb, tmp_ctx);
287
288         /* freeze all nodes */
289         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
290         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
291                 int i;
292
293                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
294                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
295                                                 nodes, i,
296                                                 CONTROL_TIMEOUT(),
297                                                 false, tdb_null,
298                                                 NULL,
299                                                 set_recmode_fail_callback,
300                                                 rec) != 0) {
301                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
302                                 talloc_free(tmp_ctx);
303                                 return -1;
304                         }
305                 }
306         }
307
308
309         data.dsize = sizeof(uint32_t);
310         data.dptr = (unsigned char *)&rec_mode;
311
312         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
313                                         nodes, 0,
314                                         CONTROL_TIMEOUT(),
315                                         false, data,
316                                         NULL, NULL,
317                                         NULL) != 0) {
318                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
319                 talloc_free(tmp_ctx);
320                 return -1;
321         }
322
323         talloc_free(tmp_ctx);
324         return 0;
325 }
326
327 /*
328   change recovery master on all node
329  */
330 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
331 {
332         TDB_DATA data;
333         TALLOC_CTX *tmp_ctx;
334         uint32_t *nodes;
335
336         tmp_ctx = talloc_new(ctdb);
337         CTDB_NO_MEMORY(ctdb, tmp_ctx);
338
339         data.dsize = sizeof(uint32_t);
340         data.dptr = (unsigned char *)&pnn;
341
342         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
343         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
344                                         nodes, 0,
345                                         CONTROL_TIMEOUT(), false, data,
346                                         NULL, NULL,
347                                         NULL) != 0) {
348                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
349                 talloc_free(tmp_ctx);
350                 return -1;
351         }
352
353         talloc_free(tmp_ctx);
354         return 0;
355 }
356
357 /* update all remote nodes to use the same db priority that we have
358    this can fail if the remove node has not yet been upgraded to 
359    support this function, so we always return success and never fail
360    a recovery if this call fails.
361 */
362 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
363         struct ctdb_node_map *nodemap, 
364         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
365 {
366         int db;
367         uint32_t *nodes;
368
369         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
370
371         /* step through all local databases */
372         for (db=0; db<dbmap->num;db++) {
373                 TDB_DATA data;
374                 struct ctdb_db_priority db_prio;
375                 int ret;
376
377                 db_prio.db_id     = dbmap->dbs[db].dbid;
378                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
379                 if (ret != 0) {
380                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
381                         continue;
382                 }
383
384                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
385
386                 data.dptr  = (uint8_t *)&db_prio;
387                 data.dsize = sizeof(db_prio);
388
389                 if (ctdb_client_async_control(ctdb,
390                                         CTDB_CONTROL_SET_DB_PRIORITY,
391                                         nodes, 0,
392                                         CONTROL_TIMEOUT(), false, data,
393                                         NULL, NULL,
394                                         NULL) != 0) {
395                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
396                 }
397         }
398
399         return 0;
400 }                       
401
402 /*
403   ensure all other nodes have attached to any databases that we have
404  */
405 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
406                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
407 {
408         int i, j, db, ret;
409         struct ctdb_dbid_map *remote_dbmap;
410
411         /* verify that all other nodes have all our databases */
412         for (j=0; j<nodemap->num; j++) {
413                 /* we dont need to ourself ourselves */
414                 if (nodemap->nodes[j].pnn == pnn) {
415                         continue;
416                 }
417                 /* dont check nodes that are unavailable */
418                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
419                         continue;
420                 }
421
422                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
423                                          mem_ctx, &remote_dbmap);
424                 if (ret != 0) {
425                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
426                         return -1;
427                 }
428
429                 /* step through all local databases */
430                 for (db=0; db<dbmap->num;db++) {
431                         const char *name;
432
433
434                         for (i=0;i<remote_dbmap->num;i++) {
435                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
436                                         break;
437                                 }
438                         }
439                         /* the remote node already have this database */
440                         if (i!=remote_dbmap->num) {
441                                 continue;
442                         }
443                         /* ok so we need to create this database */
444                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
445                                             mem_ctx, &name);
446                         if (ret != 0) {
447                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
448                                 return -1;
449                         }
450                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
451                                            mem_ctx, name,
452                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
453                         if (ret != 0) {
454                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
455                                 return -1;
456                         }
457                 }
458         }
459
460         return 0;
461 }
462
463
464 /*
465   ensure we are attached to any databases that anyone else is attached to
466  */
467 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
468                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
469 {
470         int i, j, db, ret;
471         struct ctdb_dbid_map *remote_dbmap;
472
473         /* verify that we have all database any other node has */
474         for (j=0; j<nodemap->num; j++) {
475                 /* we dont need to ourself ourselves */
476                 if (nodemap->nodes[j].pnn == pnn) {
477                         continue;
478                 }
479                 /* dont check nodes that are unavailable */
480                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
481                         continue;
482                 }
483
484                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
485                                          mem_ctx, &remote_dbmap);
486                 if (ret != 0) {
487                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
488                         return -1;
489                 }
490
491                 /* step through all databases on the remote node */
492                 for (db=0; db<remote_dbmap->num;db++) {
493                         const char *name;
494
495                         for (i=0;i<(*dbmap)->num;i++) {
496                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
497                                         break;
498                                 }
499                         }
500                         /* we already have this db locally */
501                         if (i!=(*dbmap)->num) {
502                                 continue;
503                         }
504                         /* ok so we need to create this database and
505                            rebuild dbmap
506                          */
507                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
509                         if (ret != 0) {
510                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
511                                           nodemap->nodes[j].pnn));
512                                 return -1;
513                         }
514                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
515                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
516                         if (ret != 0) {
517                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
518                                 return -1;
519                         }
520                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
521                         if (ret != 0) {
522                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
523                                 return -1;
524                         }
525                 }
526         }
527
528         return 0;
529 }
530
531
532 /*
533   pull the remote database contents from one node into the recdb
534  */
535 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
536                                     struct tdb_wrap *recdb, uint32_t dbid)
537 {
538         int ret;
539         TDB_DATA outdata;
540         struct ctdb_marshall_buffer *reply;
541         struct ctdb_rec_data *rec;
542         int i;
543         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
544
545         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
546                                CONTROL_TIMEOUT(), &outdata);
547         if (ret != 0) {
548                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
549                 talloc_free(tmp_ctx);
550                 return -1;
551         }
552
553         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
554
555         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
556                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
557                 talloc_free(tmp_ctx);
558                 return -1;
559         }
560         
561         rec = (struct ctdb_rec_data *)&reply->data[0];
562         
563         for (i=0;
564              i<reply->count;
565              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
566                 TDB_DATA key, data;
567                 struct ctdb_ltdb_header *hdr;
568                 TDB_DATA existing;
569                 
570                 key.dptr = &rec->data[0];
571                 key.dsize = rec->keylen;
572                 data.dptr = &rec->data[key.dsize];
573                 data.dsize = rec->datalen;
574                 
575                 hdr = (struct ctdb_ltdb_header *)data.dptr;
576
577                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
578                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
579                         talloc_free(tmp_ctx);
580                         return -1;
581                 }
582
583                 /* fetch the existing record, if any */
584                 existing = tdb_fetch(recdb->tdb, key);
585                 
586                 if (existing.dptr != NULL) {
587                         struct ctdb_ltdb_header header;
588                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
589                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
590                                          (unsigned)existing.dsize, srcnode));
591                                 free(existing.dptr);
592                                 talloc_free(tmp_ctx);
593                                 return -1;
594                         }
595                         header = *(struct ctdb_ltdb_header *)existing.dptr;
596                         free(existing.dptr);
597                         if (!(header.rsn < hdr->rsn ||
598                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
599                                 continue;
600                         }
601                 }
602                 
603                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
604                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
605                         talloc_free(tmp_ctx);
606                         return -1;                              
607                 }
608         }
609
610         talloc_free(tmp_ctx);
611
612         return 0;
613 }
614
615
616 struct pull_seqnum_cbdata {
617         int failed;
618         uint32_t pnn;
619         uint64_t seqnum;
620 };
621
622 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
623 {
624         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
625         uint64_t seqnum;
626
627         if (cb_data->failed != 0) {
628                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
629                 return;
630         }
631
632         if (res != 0) {
633                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
634                 cb_data->failed = 1;
635                 return;
636         }
637
638         if (outdata.dsize != sizeof(uint64_t)) {
639                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
640                 cb_data->failed = -1;
641                 return;
642         }
643
644         seqnum = *((uint64_t *)outdata.dptr);
645
646         if (seqnum > cb_data->seqnum) {
647                 cb_data->seqnum = seqnum;
648                 cb_data->pnn = node_pnn;
649         }
650 }
651
652 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
653 {
654         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
655
656         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
657         cb_data->failed = 1;
658 }
659
660 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
661                                 struct ctdb_recoverd *rec, 
662                                 struct ctdb_node_map *nodemap, 
663                                 struct tdb_wrap *recdb, uint32_t dbid)
664 {
665         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
666         uint32_t *nodes;
667         TDB_DATA data;
668         uint32_t outdata[2];
669         struct pull_seqnum_cbdata *cb_data;
670
671         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
672
673         outdata[0] = dbid;
674         outdata[1] = 0;
675
676         data.dsize = sizeof(outdata);
677         data.dptr  = (uint8_t *)&outdata[0];
678
679         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
680         if (cb_data == NULL) {
681                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
682                 talloc_free(tmp_ctx);
683                 return -1;
684         }
685
686         cb_data->failed = 0;
687         cb_data->pnn    = -1;
688         cb_data->seqnum = 0;
689         
690         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
691         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
692                                         nodes, 0,
693                                         CONTROL_TIMEOUT(), false, data,
694                                         pull_seqnum_cb,
695                                         pull_seqnum_fail_cb,
696                                         cb_data) != 0) {
697                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
698
699                 talloc_free(tmp_ctx);
700                 return -1;
701         }
702
703         if (cb_data->failed != 0) {
704                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
705                 talloc_free(tmp_ctx);
706                 return -1;
707         }
708
709         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
710                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
711                 talloc_free(tmp_ctx);
712                 return -1;
713         }
714
715         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
716
717         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
718                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
719                 talloc_free(tmp_ctx);
720                 return -1;
721         }
722
723         talloc_free(tmp_ctx);
724         return 0;
725 }
726
727
728 /*
729   pull all the remote database contents into the recdb
730  */
731 static int pull_remote_database(struct ctdb_context *ctdb,
732                                 struct ctdb_recoverd *rec, 
733                                 struct ctdb_node_map *nodemap, 
734                                 struct tdb_wrap *recdb, uint32_t dbid,
735                                 bool persistent)
736 {
737         int j;
738
739         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
740                 int ret;
741                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
742                 if (ret == 0) {
743                         return 0;
744                 }
745         }
746
747         /* pull all records from all other nodes across onto this node
748            (this merges based on rsn)
749         */
750         for (j=0; j<nodemap->num; j++) {
751                 /* dont merge from nodes that are unavailable */
752                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
753                         continue;
754                 }
755                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
756                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
757                                  nodemap->nodes[j].pnn));
758                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
759                         return -1;
760                 }
761         }
762         
763         return 0;
764 }
765
766
767 /*
768   update flags on all active nodes
769  */
770 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
771 {
772         int ret;
773
774         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
775                 if (ret != 0) {
776                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
777                 return -1;
778         }
779
780         return 0;
781 }
782
783 /*
784   ensure all nodes have the same vnnmap we do
785  */
786 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
787                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
788 {
789         int j, ret;
790
791         /* push the new vnn map out to all the nodes */
792         for (j=0; j<nodemap->num; j++) {
793                 /* dont push to nodes that are unavailable */
794                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
795                         continue;
796                 }
797
798                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
799                 if (ret != 0) {
800                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
801                         return -1;
802                 }
803         }
804
805         return 0;
806 }
807
808
809 struct vacuum_info {
810         struct vacuum_info *next, *prev;
811         struct ctdb_recoverd *rec;
812         uint32_t srcnode;
813         struct ctdb_db_context *ctdb_db;
814         struct ctdb_marshall_buffer *recs;
815         struct ctdb_rec_data *r;
816 };
817
818 static void vacuum_fetch_next(struct vacuum_info *v);
819
820 /*
821   called when a vacuum fetch has completed - just free it and do the next one
822  */
823 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
824 {
825         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
826         talloc_free(state);
827         vacuum_fetch_next(v);
828 }
829
830
831 /*
832   process the next element from the vacuum list
833 */
834 static void vacuum_fetch_next(struct vacuum_info *v)
835 {
836         struct ctdb_call call;
837         struct ctdb_rec_data *r;
838
839         while (v->recs->count) {
840                 struct ctdb_client_call_state *state;
841                 TDB_DATA data;
842                 struct ctdb_ltdb_header *hdr;
843
844                 ZERO_STRUCT(call);
845                 call.call_id = CTDB_NULL_FUNC;
846                 call.flags = CTDB_IMMEDIATE_MIGRATION;
847                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
848
849                 r = v->r;
850                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
851                 v->recs->count--;
852
853                 call.key.dptr = &r->data[0];
854                 call.key.dsize = r->keylen;
855
856                 /* ensure we don't block this daemon - just skip a record if we can't get
857                    the chainlock */
858                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
859                         continue;
860                 }
861
862                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
863                 if (data.dptr == NULL) {
864                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
865                         continue;
866                 }
867
868                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
869                         free(data.dptr);
870                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
871                         continue;
872                 }
873                 
874                 hdr = (struct ctdb_ltdb_header *)data.dptr;
875                 if (hdr->dmaster == v->rec->ctdb->pnn) {
876                         /* its already local */
877                         free(data.dptr);
878                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
879                         continue;
880                 }
881
882                 free(data.dptr);
883
884                 state = ctdb_call_send(v->ctdb_db, &call);
885                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
886                 if (state == NULL) {
887                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
888                         talloc_free(v);
889                         return;
890                 }
891                 state->async.fn = vacuum_fetch_callback;
892                 state->async.private_data = v;
893                 return;
894         }
895
896         talloc_free(v);
897 }
898
899
900 /*
901   destroy a vacuum info structure
902  */
903 static int vacuum_info_destructor(struct vacuum_info *v)
904 {
905         DLIST_REMOVE(v->rec->vacuum_info, v);
906         return 0;
907 }
908
909
910 /*
911   handler for vacuum fetch
912 */
913 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
914                                  TDB_DATA data, void *private_data)
915 {
916         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
917         struct ctdb_marshall_buffer *recs;
918         int ret, i;
919         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
920         const char *name;
921         struct ctdb_dbid_map *dbmap=NULL;
922         bool persistent = false;
923         struct ctdb_db_context *ctdb_db;
924         struct ctdb_rec_data *r;
925         uint32_t srcnode;
926         struct vacuum_info *v;
927
928         recs = (struct ctdb_marshall_buffer *)data.dptr;
929         r = (struct ctdb_rec_data *)&recs->data[0];
930
931         if (recs->count == 0) {
932                 talloc_free(tmp_ctx);
933                 return;
934         }
935
936         srcnode = r->reqid;
937
938         for (v=rec->vacuum_info;v;v=v->next) {
939                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
940                         /* we're already working on records from this node */
941                         talloc_free(tmp_ctx);
942                         return;
943                 }
944         }
945
946         /* work out if the database is persistent */
947         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
948         if (ret != 0) {
949                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
950                 talloc_free(tmp_ctx);
951                 return;
952         }
953
954         for (i=0;i<dbmap->num;i++) {
955                 if (dbmap->dbs[i].dbid == recs->db_id) {
956                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
957                         break;
958                 }
959         }
960         if (i == dbmap->num) {
961                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
962                 talloc_free(tmp_ctx);
963                 return;         
964         }
965
966         /* find the name of this database */
967         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
968                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
969                 talloc_free(tmp_ctx);
970                 return;
971         }
972
973         /* attach to it */
974         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
975         if (ctdb_db == NULL) {
976                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
977                 talloc_free(tmp_ctx);
978                 return;
979         }
980
981         v = talloc_zero(rec, struct vacuum_info);
982         if (v == NULL) {
983                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
984                 talloc_free(tmp_ctx);
985                 return;
986         }
987
988         v->rec = rec;
989         v->srcnode = srcnode;
990         v->ctdb_db = ctdb_db;
991         v->recs = talloc_memdup(v, recs, data.dsize);
992         if (v->recs == NULL) {
993                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
994                 talloc_free(v);
995                 talloc_free(tmp_ctx);
996                 return;         
997         }
998         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
999
1000         DLIST_ADD(rec->vacuum_info, v);
1001
1002         talloc_set_destructor(v, vacuum_info_destructor);
1003
1004         vacuum_fetch_next(v);
1005         talloc_free(tmp_ctx);
1006 }
1007
1008
1009 /*
1010   called when ctdb_wait_timeout should finish
1011  */
1012 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1013                               struct timeval yt, void *p)
1014 {
1015         uint32_t *timed_out = (uint32_t *)p;
1016         (*timed_out) = 1;
1017 }
1018
1019 /*
1020   wait for a given number of seconds
1021  */
1022 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1023 {
1024         uint32_t timed_out = 0;
1025         time_t usecs = (secs - (time_t)secs) * 1000000;
1026         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1027         while (!timed_out) {
1028                 event_loop_once(ctdb->ev);
1029         }
1030 }
1031
1032 /*
1033   called when an election times out (ends)
1034  */
1035 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1036                                   struct timeval t, void *p)
1037 {
1038         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1039         rec->election_timeout = NULL;
1040         fast_start = false;
1041
1042         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1043 }
1044
1045
1046 /*
1047   wait for an election to finish. It finished election_timeout seconds after
1048   the last election packet is received
1049  */
1050 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1051 {
1052         struct ctdb_context *ctdb = rec->ctdb;
1053         while (rec->election_timeout) {
1054                 event_loop_once(ctdb->ev);
1055         }
1056 }
1057
1058 /*
1059   Update our local flags from all remote connected nodes. 
1060   This is only run when we are or we belive we are the recovery master
1061  */
1062 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1063 {
1064         int j;
1065         struct ctdb_context *ctdb = rec->ctdb;
1066         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1067
1068         /* get the nodemap for all active remote nodes and verify
1069            they are the same as for this node
1070          */
1071         for (j=0; j<nodemap->num; j++) {
1072                 struct ctdb_node_map *remote_nodemap=NULL;
1073                 int ret;
1074
1075                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1076                         continue;
1077                 }
1078                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1079                         continue;
1080                 }
1081
1082                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1083                                            mem_ctx, &remote_nodemap);
1084                 if (ret != 0) {
1085                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1086                                   nodemap->nodes[j].pnn));
1087                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1088                         talloc_free(mem_ctx);
1089                         return MONITOR_FAILED;
1090                 }
1091                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1092                         /* We should tell our daemon about this so it
1093                            updates its flags or else we will log the same 
1094                            message again in the next iteration of recovery.
1095                            Since we are the recovery master we can just as
1096                            well update the flags on all nodes.
1097                         */
1098                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1099                         if (ret != 0) {
1100                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1101                                 return -1;
1102                         }
1103
1104                         /* Update our local copy of the flags in the recovery
1105                            daemon.
1106                         */
1107                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1108                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1109                                  nodemap->nodes[j].flags));
1110                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1111                 }
1112                 talloc_free(remote_nodemap);
1113         }
1114         talloc_free(mem_ctx);
1115         return MONITOR_OK;
1116 }
1117
1118
1119 /* Create a new random generation ip. 
1120    The generation id can not be the INVALID_GENERATION id
1121 */
1122 static uint32_t new_generation(void)
1123 {
1124         uint32_t generation;
1125
1126         while (1) {
1127                 generation = random();
1128
1129                 if (generation != INVALID_GENERATION) {
1130                         break;
1131                 }
1132         }
1133
1134         return generation;
1135 }
1136
1137
1138 /*
1139   create a temporary working database
1140  */
1141 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1142 {
1143         char *name;
1144         struct tdb_wrap *recdb;
1145         unsigned tdb_flags;
1146
1147         /* open up the temporary recovery database */
1148         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1149                                ctdb->db_directory_state,
1150                                ctdb->pnn);
1151         if (name == NULL) {
1152                 return NULL;
1153         }
1154         unlink(name);
1155
1156         tdb_flags = TDB_NOLOCK;
1157         if (ctdb->valgrinding) {
1158                 tdb_flags |= TDB_NOMMAP;
1159         }
1160         tdb_flags |= TDB_DISALLOW_NESTING;
1161
1162         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1163                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1164         if (recdb == NULL) {
1165                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1166         }
1167
1168         talloc_free(name);
1169
1170         return recdb;
1171 }
1172
1173
1174 /* 
1175    a traverse function for pulling all relevent records from recdb
1176  */
1177 struct recdb_data {
1178         struct ctdb_context *ctdb;
1179         struct ctdb_marshall_buffer *recdata;
1180         uint32_t len;
1181         uint32_t allocated_len;
1182         bool failed;
1183         bool persistent;
1184 };
1185
1186 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1187 {
1188         struct recdb_data *params = (struct recdb_data *)p;
1189         struct ctdb_rec_data *rec;
1190         struct ctdb_ltdb_header *hdr;
1191
1192         /* skip empty records */
1193         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1194                 return 0;
1195         }
1196
1197         /* update the dmaster field to point to us */
1198         hdr = (struct ctdb_ltdb_header *)data.dptr;
1199         if (!params->persistent) {
1200                 hdr->dmaster = params->ctdb->pnn;
1201                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1202         }
1203
1204         /* add the record to the blob ready to send to the nodes */
1205         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1206         if (rec == NULL) {
1207                 params->failed = true;
1208                 return -1;
1209         }
1210         if (params->len + rec->length >= params->allocated_len) {
1211                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1212                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1213         }
1214         if (params->recdata == NULL) {
1215                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1216                          rec->length + params->len, params->recdata->count));
1217                 params->failed = true;
1218                 return -1;
1219         }
1220         params->recdata->count++;
1221         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1222         params->len += rec->length;
1223         talloc_free(rec);
1224
1225         return 0;
1226 }
1227
1228 /*
1229   push the recdb database out to all nodes
1230  */
1231 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1232                                bool persistent,
1233                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1234 {
1235         struct recdb_data params;
1236         struct ctdb_marshall_buffer *recdata;
1237         TDB_DATA outdata;
1238         TALLOC_CTX *tmp_ctx;
1239         uint32_t *nodes;
1240
1241         tmp_ctx = talloc_new(ctdb);
1242         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1243
1244         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1245         CTDB_NO_MEMORY(ctdb, recdata);
1246
1247         recdata->db_id = dbid;
1248
1249         params.ctdb = ctdb;
1250         params.recdata = recdata;
1251         params.len = offsetof(struct ctdb_marshall_buffer, data);
1252         params.allocated_len = params.len;
1253         params.failed = false;
1254         params.persistent = persistent;
1255
1256         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1257                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1258                 talloc_free(params.recdata);
1259                 talloc_free(tmp_ctx);
1260                 return -1;
1261         }
1262
1263         if (params.failed) {
1264                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1265                 talloc_free(params.recdata);
1266                 talloc_free(tmp_ctx);
1267                 return -1;              
1268         }
1269
1270         recdata = params.recdata;
1271
1272         outdata.dptr = (void *)recdata;
1273         outdata.dsize = params.len;
1274
1275         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1276         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1277                                         nodes, 0,
1278                                         CONTROL_TIMEOUT(), false, outdata,
1279                                         NULL, NULL,
1280                                         NULL) != 0) {
1281                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1282                 talloc_free(recdata);
1283                 talloc_free(tmp_ctx);
1284                 return -1;
1285         }
1286
1287         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1288                   dbid, recdata->count));
1289
1290         talloc_free(recdata);
1291         talloc_free(tmp_ctx);
1292
1293         return 0;
1294 }
1295
1296
1297 /*
1298   go through a full recovery on one database 
1299  */
1300 static int recover_database(struct ctdb_recoverd *rec, 
1301                             TALLOC_CTX *mem_ctx,
1302                             uint32_t dbid,
1303                             bool persistent,
1304                             uint32_t pnn, 
1305                             struct ctdb_node_map *nodemap,
1306                             uint32_t transaction_id)
1307 {
1308         struct tdb_wrap *recdb;
1309         int ret;
1310         struct ctdb_context *ctdb = rec->ctdb;
1311         TDB_DATA data;
1312         struct ctdb_control_wipe_database w;
1313         uint32_t *nodes;
1314
1315         recdb = create_recdb(ctdb, mem_ctx);
1316         if (recdb == NULL) {
1317                 return -1;
1318         }
1319
1320         /* pull all remote databases onto the recdb */
1321         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1322         if (ret != 0) {
1323                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1324                 return -1;
1325         }
1326
1327         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1328
1329         /* wipe all the remote databases. This is safe as we are in a transaction */
1330         w.db_id = dbid;
1331         w.transaction_id = transaction_id;
1332
1333         data.dptr = (void *)&w;
1334         data.dsize = sizeof(w);
1335
1336         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1337         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1338                                         nodes, 0,
1339                                         CONTROL_TIMEOUT(), false, data,
1340                                         NULL, NULL,
1341                                         NULL) != 0) {
1342                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1343                 talloc_free(recdb);
1344                 return -1;
1345         }
1346         
1347         /* push out the correct database. This sets the dmaster and skips 
1348            the empty records */
1349         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1350         if (ret != 0) {
1351                 talloc_free(recdb);
1352                 return -1;
1353         }
1354
1355         /* all done with this database */
1356         talloc_free(recdb);
1357
1358         return 0;
1359 }
1360
1361 /*
1362   reload the nodes file 
1363 */
1364 static void reload_nodes_file(struct ctdb_context *ctdb)
1365 {
1366         ctdb->nodes = NULL;
1367         ctdb_load_nodes_file(ctdb);
1368 }
1369
1370 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1371                                          struct ctdb_recoverd *rec,
1372                                          struct ctdb_node_map *nodemap,
1373                                          uint32_t *culprit)
1374 {
1375         int j;
1376         int ret;
1377
1378         if (ctdb->num_nodes != nodemap->num) {
1379                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1380                                   ctdb->num_nodes, nodemap->num));
1381                 if (culprit) {
1382                         *culprit = ctdb->pnn;
1383                 }
1384                 return -1;
1385         }
1386
1387         for (j=0; j<nodemap->num; j++) {
1388                 /* release any existing data */
1389                 if (ctdb->nodes[j]->known_public_ips) {
1390                         talloc_free(ctdb->nodes[j]->known_public_ips);
1391                         ctdb->nodes[j]->known_public_ips = NULL;
1392                 }
1393                 if (ctdb->nodes[j]->available_public_ips) {
1394                         talloc_free(ctdb->nodes[j]->available_public_ips);
1395                         ctdb->nodes[j]->available_public_ips = NULL;
1396                 }
1397
1398                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1399                         continue;
1400                 }
1401
1402                 /* grab a new shiny list of public ips from the node */
1403                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1404                                         CONTROL_TIMEOUT(),
1405                                         ctdb->nodes[j]->pnn,
1406                                         ctdb->nodes,
1407                                         0,
1408                                         &ctdb->nodes[j]->known_public_ips);
1409                 if (ret != 0) {
1410                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1411                                 ctdb->nodes[j]->pnn));
1412                         if (culprit) {
1413                                 *culprit = ctdb->nodes[j]->pnn;
1414                         }
1415                         return -1;
1416                 }
1417
1418                 if (ctdb->do_checkpublicip) {
1419                         if (rec->ip_check_disable_ctx == NULL) {
1420                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1421                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1422                                         rec->need_takeover_run = true;
1423                                 }
1424                         }
1425                 }
1426
1427                 /* grab a new shiny list of public ips from the node */
1428                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1429                                         CONTROL_TIMEOUT(),
1430                                         ctdb->nodes[j]->pnn,
1431                                         ctdb->nodes,
1432                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1433                                         &ctdb->nodes[j]->available_public_ips);
1434                 if (ret != 0) {
1435                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1436                                 ctdb->nodes[j]->pnn));
1437                         if (culprit) {
1438                                 *culprit = ctdb->nodes[j]->pnn;
1439                         }
1440                         return -1;
1441                 }
1442         }
1443
1444         return 0;
1445 }
1446
1447 /* when we start a recovery, make sure all nodes use the same reclock file
1448    setting
1449 */
1450 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1451 {
1452         struct ctdb_context *ctdb = rec->ctdb;
1453         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1454         TDB_DATA data;
1455         uint32_t *nodes;
1456
1457         if (ctdb->recovery_lock_file == NULL) {
1458                 data.dptr  = NULL;
1459                 data.dsize = 0;
1460         } else {
1461                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1462                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1463         }
1464
1465         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1466         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1467                                         nodes, 0,
1468                                         CONTROL_TIMEOUT(),
1469                                         false, data,
1470                                         NULL, NULL,
1471                                         rec) != 0) {
1472                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1473                 talloc_free(tmp_ctx);
1474                 return -1;
1475         }
1476
1477         talloc_free(tmp_ctx);
1478         return 0;
1479 }
1480
1481
1482 /*
1483   we are the recmaster, and recovery is needed - start a recovery run
1484  */
1485 static int do_recovery(struct ctdb_recoverd *rec, 
1486                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1487                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1488 {
1489         struct ctdb_context *ctdb = rec->ctdb;
1490         int i, j, ret;
1491         uint32_t generation;
1492         struct ctdb_dbid_map *dbmap;
1493         TDB_DATA data;
1494         uint32_t *nodes;
1495         struct timeval start_time;
1496         uint32_t culprit = (uint32_t)-1;
1497
1498         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1499
1500         /* if recovery fails, force it again */
1501         rec->need_recovery = true;
1502
1503         for (i=0; i<ctdb->num_nodes; i++) {
1504                 struct ctdb_banning_state *ban_state;
1505
1506                 if (ctdb->nodes[i]->ban_state == NULL) {
1507                         continue;
1508                 }
1509                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1510                 if (ban_state->count < 2*ctdb->num_nodes) {
1511                         continue;
1512                 }
1513                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1514                         ctdb->nodes[i]->pnn, ban_state->count,
1515                         ctdb->tunable.recovery_ban_period));
1516                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1517                 ban_state->count = 0;
1518         }
1519
1520
1521         if (ctdb->tunable.verify_recovery_lock != 0) {
1522                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1523                 start_time = timeval_current();
1524                 if (!ctdb_recovery_lock(ctdb, true)) {
1525                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1526                                          "and ban ourself for %u seconds\n",
1527                                          ctdb->tunable.recovery_ban_period));
1528                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1529                         return -1;
1530                 }
1531                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1532                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1533         }
1534
1535         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1536
1537         /* get a list of all databases */
1538         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1539         if (ret != 0) {
1540                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1541                 return -1;
1542         }
1543
1544         /* we do the db creation before we set the recovery mode, so the freeze happens
1545            on all databases we will be dealing with. */
1546
1547         /* verify that we have all the databases any other node has */
1548         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1549         if (ret != 0) {
1550                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1551                 return -1;
1552         }
1553
1554         /* verify that all other nodes have all our databases */
1555         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1556         if (ret != 0) {
1557                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1558                 return -1;
1559         }
1560         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1561
1562         /* update the database priority for all remote databases */
1563         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1564         if (ret != 0) {
1565                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1566         }
1567         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1568
1569
1570         /* update all other nodes to use the same setting for reclock files
1571            as the local recovery master.
1572         */
1573         sync_recovery_lock_file_across_cluster(rec);
1574
1575         /* set recovery mode to active on all nodes */
1576         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1577         if (ret != 0) {
1578                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1579                 return -1;
1580         }
1581
1582         /* execute the "startrecovery" event script on all nodes */
1583         ret = run_startrecovery_eventscript(rec, nodemap);
1584         if (ret!=0) {
1585                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1586                 return -1;
1587         }
1588
1589         /*
1590           update all nodes to have the same flags that we have
1591          */
1592         for (i=0;i<nodemap->num;i++) {
1593                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1594                         continue;
1595                 }
1596
1597                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1598                 if (ret != 0) {
1599                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1600                         return -1;
1601                 }
1602         }
1603
1604         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1605
1606         /* pick a new generation number */
1607         generation = new_generation();
1608
1609         /* change the vnnmap on this node to use the new generation 
1610            number but not on any other nodes.
1611            this guarantees that if we abort the recovery prematurely
1612            for some reason (a node stops responding?)
1613            that we can just return immediately and we will reenter
1614            recovery shortly again.
1615            I.e. we deliberately leave the cluster with an inconsistent
1616            generation id to allow us to abort recovery at any stage and
1617            just restart it from scratch.
1618          */
1619         vnnmap->generation = generation;
1620         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1621         if (ret != 0) {
1622                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1623                 return -1;
1624         }
1625
1626         data.dptr = (void *)&generation;
1627         data.dsize = sizeof(uint32_t);
1628
1629         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1630         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1631                                         nodes, 0,
1632                                         CONTROL_TIMEOUT(), false, data,
1633                                         NULL,
1634                                         transaction_start_fail_callback,
1635                                         rec) != 0) {
1636                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1637                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1638                                         nodes, 0,
1639                                         CONTROL_TIMEOUT(), false, tdb_null,
1640                                         NULL,
1641                                         NULL,
1642                                         NULL) != 0) {
1643                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1644                 }
1645                 return -1;
1646         }
1647
1648         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1649
1650         for (i=0;i<dbmap->num;i++) {
1651                 ret = recover_database(rec, mem_ctx,
1652                                        dbmap->dbs[i].dbid,
1653                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1654                                        pnn, nodemap, generation);
1655                 if (ret != 0) {
1656                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1657                         return -1;
1658                 }
1659         }
1660
1661         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1662
1663         /* commit all the changes */
1664         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1665                                         nodes, 0,
1666                                         CONTROL_TIMEOUT(), false, data,
1667                                         NULL, NULL,
1668                                         NULL) != 0) {
1669                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1670                 return -1;
1671         }
1672
1673         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1674         
1675
1676         /* update the capabilities for all nodes */
1677         ret = update_capabilities(ctdb, nodemap);
1678         if (ret!=0) {
1679                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1680                 return -1;
1681         }
1682
1683         /* build a new vnn map with all the currently active and
1684            unbanned nodes */
1685         generation = new_generation();
1686         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1687         CTDB_NO_MEMORY(ctdb, vnnmap);
1688         vnnmap->generation = generation;
1689         vnnmap->size = 0;
1690         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1691         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1692         for (i=j=0;i<nodemap->num;i++) {
1693                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1694                         continue;
1695                 }
1696                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1697                         /* this node can not be an lmaster */
1698                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1699                         continue;
1700                 }
1701
1702                 vnnmap->size++;
1703                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1704                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1705                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1706
1707         }
1708         if (vnnmap->size == 0) {
1709                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1710                 vnnmap->size++;
1711                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1712                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1713                 vnnmap->map[0] = pnn;
1714         }       
1715
1716         /* update to the new vnnmap on all nodes */
1717         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1718         if (ret != 0) {
1719                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1720                 return -1;
1721         }
1722
1723         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1724
1725         /* update recmaster to point to us for all nodes */
1726         ret = set_recovery_master(ctdb, nodemap, pnn);
1727         if (ret!=0) {
1728                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1729                 return -1;
1730         }
1731
1732         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1733
1734         /*
1735           update all nodes to have the same flags that we have
1736          */
1737         for (i=0;i<nodemap->num;i++) {
1738                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1739                         continue;
1740                 }
1741
1742                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1743                 if (ret != 0) {
1744                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1745                         return -1;
1746                 }
1747         }
1748
1749         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1750
1751         /* disable recovery mode */
1752         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1753         if (ret != 0) {
1754                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1755                 return -1;
1756         }
1757
1758         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1759
1760         /*
1761           tell nodes to takeover their public IPs
1762          */
1763         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1764         if (ret != 0) {
1765                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1766                                  culprit));
1767                 rec->need_takeover_run = true;
1768                 return -1;
1769         }
1770         rec->need_takeover_run = false;
1771         ret = ctdb_takeover_run(ctdb, nodemap);
1772         if (ret != 0) {
1773                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1774                 rec->need_takeover_run = true;
1775         }
1776
1777         /* execute the "recovered" event script on all nodes */
1778         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1779         if (ret!=0) {
1780                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1781                 return -1;
1782         }
1783
1784         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1785
1786         /* send a message to all clients telling them that the cluster 
1787            has been reconfigured */
1788         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1789
1790         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1791
1792         rec->need_recovery = false;
1793
1794         /* we managed to complete a full recovery, make sure to forgive
1795            any past sins by the nodes that could now participate in the
1796            recovery.
1797         */
1798         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1799         for (i=0;i<nodemap->num;i++) {
1800                 struct ctdb_banning_state *ban_state;
1801
1802                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1803                         continue;
1804                 }
1805
1806                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1807                 if (ban_state == NULL) {
1808                         continue;
1809                 }
1810
1811                 ban_state->count = 0;
1812         }
1813
1814
1815         /* We just finished a recovery successfully. 
1816            We now wait for rerecovery_timeout before we allow 
1817            another recovery to take place.
1818         */
1819         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1820         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1821         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1822
1823         return 0;
1824 }
1825
1826
1827 /*
1828   elections are won by first checking the number of connected nodes, then
1829   the priority time, then the pnn
1830  */
1831 struct election_message {
1832         uint32_t num_connected;
1833         struct timeval priority_time;
1834         uint32_t pnn;
1835         uint32_t node_flags;
1836 };
1837
1838 /*
1839   form this nodes election data
1840  */
1841 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1842 {
1843         int ret, i;
1844         struct ctdb_node_map *nodemap;
1845         struct ctdb_context *ctdb = rec->ctdb;
1846
1847         ZERO_STRUCTP(em);
1848
1849         em->pnn = rec->ctdb->pnn;
1850         em->priority_time = rec->priority_time;
1851
1852         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1853         if (ret != 0) {
1854                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1855                 return;
1856         }
1857
1858         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1859         em->node_flags = rec->node_flags;
1860
1861         for (i=0;i<nodemap->num;i++) {
1862                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1863                         em->num_connected++;
1864                 }
1865         }
1866
1867         /* we shouldnt try to win this election if we cant be a recmaster */
1868         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1869                 em->num_connected = 0;
1870                 em->priority_time = timeval_current();
1871         }
1872
1873         talloc_free(nodemap);
1874 }
1875
1876 /*
1877   see if the given election data wins
1878  */
1879 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1880 {
1881         struct election_message myem;
1882         int cmp = 0;
1883
1884         ctdb_election_data(rec, &myem);
1885
1886         /* we cant win if we dont have the recmaster capability */
1887         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1888                 return false;
1889         }
1890
1891         /* we cant win if we are banned */
1892         if (rec->node_flags & NODE_FLAGS_BANNED) {
1893                 return false;
1894         }       
1895
1896         /* we cant win if we are stopped */
1897         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1898                 return false;
1899         }       
1900
1901         /* we will automatically win if the other node is banned */
1902         if (em->node_flags & NODE_FLAGS_BANNED) {
1903                 return true;
1904         }
1905
1906         /* we will automatically win if the other node is banned */
1907         if (em->node_flags & NODE_FLAGS_STOPPED) {
1908                 return true;
1909         }
1910
1911         /* try to use the most connected node */
1912         if (cmp == 0) {
1913                 cmp = (int)myem.num_connected - (int)em->num_connected;
1914         }
1915
1916         /* then the longest running node */
1917         if (cmp == 0) {
1918                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1919         }
1920
1921         if (cmp == 0) {
1922                 cmp = (int)myem.pnn - (int)em->pnn;
1923         }
1924
1925         return cmp > 0;
1926 }
1927
1928 /*
1929   send out an election request
1930  */
1931 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1932 {
1933         int ret;
1934         TDB_DATA election_data;
1935         struct election_message emsg;
1936         uint64_t srvid;
1937         struct ctdb_context *ctdb = rec->ctdb;
1938
1939         srvid = CTDB_SRVID_RECOVERY;
1940
1941         ctdb_election_data(rec, &emsg);
1942
1943         election_data.dsize = sizeof(struct election_message);
1944         election_data.dptr  = (unsigned char *)&emsg;
1945
1946
1947         /* send an election message to all active nodes */
1948         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1949         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1950
1951
1952         /* A new node that is already frozen has entered the cluster.
1953            The existing nodes are not frozen and dont need to be frozen
1954            until the election has ended and we start the actual recovery
1955         */
1956         if (update_recmaster == true) {
1957                 /* first we assume we will win the election and set 
1958                    recoverymaster to be ourself on the current node
1959                  */
1960                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1961                 if (ret != 0) {
1962                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1963                         return -1;
1964                 }
1965         }
1966
1967
1968         return 0;
1969 }
1970
1971 /*
1972   this function will unban all nodes in the cluster
1973 */
1974 static void unban_all_nodes(struct ctdb_context *ctdb)
1975 {
1976         int ret, i;
1977         struct ctdb_node_map *nodemap;
1978         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1979         
1980         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1981         if (ret != 0) {
1982                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1983                 return;
1984         }
1985
1986         for (i=0;i<nodemap->num;i++) {
1987                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1988                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1989                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1990                 }
1991         }
1992
1993         talloc_free(tmp_ctx);
1994 }
1995
1996
1997 /*
1998   we think we are winning the election - send a broadcast election request
1999  */
2000 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2001 {
2002         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2003         int ret;
2004
2005         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
2006         if (ret != 0) {
2007                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2008         }
2009
2010         talloc_free(rec->send_election_te);
2011         rec->send_election_te = NULL;
2012 }
2013
2014 /*
2015   handler for memory dumps
2016 */
2017 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2018                              TDB_DATA data, void *private_data)
2019 {
2020         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2021         TDB_DATA *dump;
2022         int ret;
2023         struct rd_memdump_reply *rd;
2024
2025         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2026                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2027                 talloc_free(tmp_ctx);
2028                 return;
2029         }
2030         rd = (struct rd_memdump_reply *)data.dptr;
2031
2032         dump = talloc_zero(tmp_ctx, TDB_DATA);
2033         if (dump == NULL) {
2034                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2035                 talloc_free(tmp_ctx);
2036                 return;
2037         }
2038         ret = ctdb_dump_memory(ctdb, dump);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2041                 talloc_free(tmp_ctx);
2042                 return;
2043         }
2044
2045 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2046
2047         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2048         if (ret != 0) {
2049                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2050                 talloc_free(tmp_ctx);
2051                 return;
2052         }
2053
2054         talloc_free(tmp_ctx);
2055 }
2056
2057 /*
2058   handler for reload_nodes
2059 */
2060 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2061                              TDB_DATA data, void *private_data)
2062 {
2063         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2064
2065         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2066
2067         reload_nodes_file(rec->ctdb);
2068 }
2069
2070
2071 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2072                               struct timeval yt, void *p)
2073 {
2074         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2075
2076         talloc_free(rec->ip_check_disable_ctx);
2077         rec->ip_check_disable_ctx = NULL;
2078 }
2079
2080
2081 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2082                                   struct timeval t, void *p)
2083 {
2084         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2085         struct ctdb_context *ctdb = rec->ctdb;
2086         int ret;
2087
2088         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2089
2090         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2091         if (ret != 0) {
2092                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2093                 rec->need_takeover_run = true;
2094         }
2095
2096         talloc_free(rec->deferred_rebalance_ctx);
2097         rec->deferred_rebalance_ctx = NULL;
2098 }
2099
2100         
2101 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2102                              TDB_DATA data, void *private_data)
2103 {
2104         uint32_t pnn;
2105         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2106
2107         if (data.dsize != sizeof(uint32_t)) {
2108                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2109                 return;
2110         }
2111
2112         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2113                 return;
2114         }
2115
2116         pnn = *(uint32_t *)&data.dptr[0];
2117
2118         lcp2_forcerebalance(ctdb, pnn);
2119         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2120
2121         if (rec->deferred_rebalance_ctx != NULL) {
2122                 talloc_free(rec->deferred_rebalance_ctx);
2123         }
2124         rec->deferred_rebalance_ctx = talloc_new(rec);
2125         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2126                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2127                         ctdb_rebalance_timeout, rec);
2128 }
2129
2130
2131
2132 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2133                              TDB_DATA data, void *private_data)
2134 {
2135         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2136         struct ctdb_public_ip *ip;
2137
2138         if (rec->recmaster != rec->ctdb->pnn) {
2139                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2140                 return;
2141         }
2142
2143         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2144                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2145                 return;
2146         }
2147
2148         ip = (struct ctdb_public_ip *)data.dptr;
2149
2150         update_ip_assignment_tree(rec->ctdb, ip);
2151 }
2152
2153
2154 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2155                              TDB_DATA data, void *private_data)
2156 {
2157         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2158         uint32_t timeout;
2159
2160         if (rec->ip_check_disable_ctx != NULL) {
2161                 talloc_free(rec->ip_check_disable_ctx);
2162                 rec->ip_check_disable_ctx = NULL;
2163         }
2164
2165         if (data.dsize != sizeof(uint32_t)) {
2166                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2167                                  "expexting %lu\n", (long unsigned)data.dsize,
2168                                  (long unsigned)sizeof(uint32_t)));
2169                 return;
2170         }
2171         if (data.dptr == NULL) {
2172                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2173                 return;
2174         }
2175
2176         timeout = *((uint32_t *)data.dptr);
2177         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2178
2179         rec->ip_check_disable_ctx = talloc_new(rec);
2180         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2181
2182         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2183 }
2184
2185
2186 /*
2187   handler for reload all ips.
2188 */
2189 static void ip_reloadall_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2190                              TDB_DATA data, void *private_data)
2191 {
2192         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2193
2194         if (data.dsize != sizeof(struct reloadips_all_reply)) {
2195                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2196                 return;
2197         }
2198
2199         reload_all_ips_request = (struct reloadips_all_reply *)talloc_steal(rec, data.dptr);
2200
2201         DEBUG(DEBUG_NOTICE,("RELOAD_ALL_IPS message received from node:%d srvid:%d\n", reload_all_ips_request->pnn, (int)reload_all_ips_request->srvid));
2202         return;
2203 }
2204
2205 static void async_reloadips_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2206 {
2207         uint32_t *status = callback_data;
2208
2209         if (res != 0) {
2210                 DEBUG(DEBUG_ERR,("Reload ips all failed on node %d\n", node_pnn));
2211                 *status = 1;
2212         }
2213 }
2214
2215 static int
2216 reload_all_ips(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, struct reloadips_all_reply *rips)
2217 {
2218         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2219         uint32_t *nodes;
2220         uint32_t status;
2221         int i;
2222
2223         DEBUG(DEBUG_ERR,("RELOAD ALL IPS on all active nodes\n"));
2224         for (i = 0; i< nodemap->num; i++) {
2225                 if (nodemap->nodes[i].flags != 0) {
2226                         DEBUG(DEBUG_ERR, ("Can not reload ips on all nodes. Node %d is not up and healthy\n", i));
2227                         talloc_free(tmp_ctx);
2228                         return -1;
2229                 }
2230         }
2231
2232         /* send the flags update to all connected nodes */
2233         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2234         status = 0;
2235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_RELOAD_PUBLIC_IPS,
2236                                         nodes, 0,
2237                                         CONTROL_TIMEOUT(),
2238                                         false, tdb_null,
2239                                         async_reloadips_callback, NULL,
2240                                         &status) != 0) {
2241                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2242                 talloc_free(tmp_ctx);
2243                 return -1;
2244         }
2245
2246         if (status != 0) {
2247                 DEBUG(DEBUG_ERR, (__location__ " Failed to reloadips on all nodes.\n"));
2248                 talloc_free(tmp_ctx);
2249                 return -1;
2250         }
2251
2252         ctdb_client_send_message(ctdb, rips->pnn, rips->srvid, tdb_null);
2253
2254         talloc_free(tmp_ctx);
2255         return 0;
2256 }
2257
2258
2259 /*
2260   handler for ip reallocate, just add it to the list of callers and 
2261   handle this later in the monitor_cluster loop so we do not recurse
2262   with other callers to takeover_run()
2263 */
2264 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2265                              TDB_DATA data, void *private_data)
2266 {
2267         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2268         struct ip_reallocate_list *caller;
2269
2270         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2271                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2272                 return;
2273         }
2274
2275         if (rec->ip_reallocate_ctx == NULL) {
2276                 rec->ip_reallocate_ctx = talloc_new(rec);
2277                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2278         }
2279
2280         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2281         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2282
2283         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2284         caller->next = rec->reallocate_callers;
2285         rec->reallocate_callers = caller;
2286
2287         return;
2288 }
2289
2290 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2291 {
2292         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2293         TDB_DATA result;
2294         int32_t ret;
2295         struct ip_reallocate_list *callers;
2296         uint32_t culprit;
2297
2298         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2299
2300         /* update the list of public ips that a node can handle for
2301            all connected nodes
2302         */
2303         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2304         if (ret != 0) {
2305                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2306                                  culprit));
2307                 rec->need_takeover_run = true;
2308         }
2309         if (ret == 0) {
2310                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2311                 if (ret != 0) {
2312                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2313                         rec->need_takeover_run = true;
2314                 }
2315         }
2316
2317         result.dsize = sizeof(int32_t);
2318         result.dptr  = (uint8_t *)&ret;
2319
2320         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2321
2322                 /* Someone that sent srvid==0 does not want a reply */
2323                 if (callers->rd->srvid == 0) {
2324                         continue;
2325                 }
2326                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2327                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2328                                   (unsigned long long)callers->rd->srvid));
2329                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2330                 if (ret != 0) {
2331                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2332                                          "message to %u:%llu\n",
2333                                          (unsigned)callers->rd->pnn,
2334                                          (unsigned long long)callers->rd->srvid));
2335                 }
2336         }
2337
2338         talloc_free(tmp_ctx);
2339         talloc_free(rec->ip_reallocate_ctx);
2340         rec->ip_reallocate_ctx = NULL;
2341         rec->reallocate_callers = NULL;
2342         
2343 }
2344
2345
2346 /*
2347   handler for recovery master elections
2348 */
2349 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2350                              TDB_DATA data, void *private_data)
2351 {
2352         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2353         int ret;
2354         struct election_message *em = (struct election_message *)data.dptr;
2355         TALLOC_CTX *mem_ctx;
2356
2357         /* we got an election packet - update the timeout for the election */
2358         talloc_free(rec->election_timeout);
2359         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2360                                                 fast_start ?
2361                                                 timeval_current_ofs(0, 500000) :
2362                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2363                                                 ctdb_election_timeout, rec);
2364
2365         mem_ctx = talloc_new(ctdb);
2366
2367         /* someone called an election. check their election data
2368            and if we disagree and we would rather be the elected node, 
2369            send a new election message to all other nodes
2370          */
2371         if (ctdb_election_win(rec, em)) {
2372                 if (!rec->send_election_te) {
2373                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2374                                                                 timeval_current_ofs(0, 500000),
2375                                                                 election_send_request, rec);
2376                 }
2377                 talloc_free(mem_ctx);
2378                 /*unban_all_nodes(ctdb);*/
2379                 return;
2380         }
2381         
2382         /* we didn't win */
2383         talloc_free(rec->send_election_te);
2384         rec->send_election_te = NULL;
2385
2386         if (ctdb->tunable.verify_recovery_lock != 0) {
2387                 /* release the recmaster lock */
2388                 if (em->pnn != ctdb->pnn &&
2389                     ctdb->recovery_lock_fd != -1) {
2390                         close(ctdb->recovery_lock_fd);
2391                         ctdb->recovery_lock_fd = -1;
2392                         unban_all_nodes(ctdb);
2393                 }
2394         }
2395
2396         /* ok, let that guy become recmaster then */
2397         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2398         if (ret != 0) {
2399                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2400                 talloc_free(mem_ctx);
2401                 return;
2402         }
2403
2404         talloc_free(mem_ctx);
2405         return;
2406 }
2407
2408
2409 /*
2410   force the start of the election process
2411  */
2412 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2413                            struct ctdb_node_map *nodemap)
2414 {
2415         int ret;
2416         struct ctdb_context *ctdb = rec->ctdb;
2417
2418         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2419
2420         /* set all nodes to recovery mode to stop all internode traffic */
2421         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2422         if (ret != 0) {
2423                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2424                 return;
2425         }
2426
2427         talloc_free(rec->election_timeout);
2428         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2429                                                 fast_start ?
2430                                                 timeval_current_ofs(0, 500000) :
2431                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2432                                                 ctdb_election_timeout, rec);
2433
2434         ret = send_election_request(rec, pnn, true);
2435         if (ret!=0) {
2436                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2437                 return;
2438         }
2439
2440         /* wait for a few seconds to collect all responses */
2441         ctdb_wait_election(rec);
2442 }
2443
2444
2445
2446 /*
2447   handler for when a node changes its flags
2448 */
2449 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2450                             TDB_DATA data, void *private_data)
2451 {
2452         int ret;
2453         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2454         struct ctdb_node_map *nodemap=NULL;
2455         TALLOC_CTX *tmp_ctx;
2456         int i;
2457         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2458         int disabled_flag_changed;
2459
2460         if (data.dsize != sizeof(*c)) {
2461                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2462                 return;
2463         }
2464
2465         tmp_ctx = talloc_new(ctdb);
2466         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2467
2468         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2469         if (ret != 0) {
2470                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2471                 talloc_free(tmp_ctx);
2472                 return;         
2473         }
2474
2475
2476         for (i=0;i<nodemap->num;i++) {
2477                 if (nodemap->nodes[i].pnn == c->pnn) break;
2478         }
2479
2480         if (i == nodemap->num) {
2481                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2482                 talloc_free(tmp_ctx);
2483                 return;
2484         }
2485
2486         if (nodemap->nodes[i].flags != c->new_flags) {
2487                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, nodemap->nodes[i].flags));
2488         }
2489
2490         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2491
2492         nodemap->nodes[i].flags = c->new_flags;
2493
2494         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2495                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2496
2497         if (ret == 0) {
2498                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2499                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2500         }
2501         
2502         if (ret == 0 &&
2503             ctdb->recovery_master == ctdb->pnn &&
2504             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2505                 /* Only do the takeover run if the perm disabled or unhealthy
2506                    flags changed since these will cause an ip failover but not
2507                    a recovery.
2508                    If the node became disconnected or banned this will also
2509                    lead to an ip address failover but that is handled 
2510                    during recovery
2511                 */
2512                 if (disabled_flag_changed) {
2513                         rec->need_takeover_run = true;
2514                 }
2515         }
2516
2517         talloc_free(tmp_ctx);
2518 }
2519
2520 /*
2521   handler for when we need to push out flag changes ot all other nodes
2522 */
2523 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2524                             TDB_DATA data, void *private_data)
2525 {
2526         int ret;
2527         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2528         struct ctdb_node_map *nodemap=NULL;
2529         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2530         uint32_t recmaster;
2531         uint32_t *nodes;
2532
2533         /* find the recovery master */
2534         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2535         if (ret != 0) {
2536                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2537                 talloc_free(tmp_ctx);
2538                 return;
2539         }
2540
2541         /* read the node flags from the recmaster */
2542         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2543         if (ret != 0) {
2544                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2545                 talloc_free(tmp_ctx);
2546                 return;
2547         }
2548         if (c->pnn >= nodemap->num) {
2549                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2550                 talloc_free(tmp_ctx);
2551                 return;
2552         }
2553
2554         /* send the flags update to all connected nodes */
2555         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2556
2557         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2558                                       nodes, 0, CONTROL_TIMEOUT(),
2559                                       false, data,
2560                                       NULL, NULL,
2561                                       NULL) != 0) {
2562                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2563
2564                 talloc_free(tmp_ctx);
2565                 return;
2566         }
2567
2568         talloc_free(tmp_ctx);
2569 }
2570
2571
2572 struct verify_recmode_normal_data {
2573         uint32_t count;
2574         enum monitor_result status;
2575 };
2576
2577 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2578 {
2579         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2580
2581
2582         /* one more node has responded with recmode data*/
2583         rmdata->count--;
2584
2585         /* if we failed to get the recmode, then return an error and let
2586            the main loop try again.
2587         */
2588         if (state->state != CTDB_CONTROL_DONE) {
2589                 if (rmdata->status == MONITOR_OK) {
2590                         rmdata->status = MONITOR_FAILED;
2591                 }
2592                 return;
2593         }
2594
2595         /* if we got a response, then the recmode will be stored in the
2596            status field
2597         */
2598         if (state->status != CTDB_RECOVERY_NORMAL) {
2599                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2600                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2601         }
2602
2603         return;
2604 }
2605
2606
2607 /* verify that all nodes are in normal recovery mode */
2608 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2609 {
2610         struct verify_recmode_normal_data *rmdata;
2611         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2612         struct ctdb_client_control_state *state;
2613         enum monitor_result status;
2614         int j;
2615         
2616         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2617         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2618         rmdata->count  = 0;
2619         rmdata->status = MONITOR_OK;
2620
2621         /* loop over all active nodes and send an async getrecmode call to 
2622            them*/
2623         for (j=0; j<nodemap->num; j++) {
2624                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2625                         continue;
2626                 }
2627                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2628                                         CONTROL_TIMEOUT(), 
2629                                         nodemap->nodes[j].pnn);
2630                 if (state == NULL) {
2631                         /* we failed to send the control, treat this as 
2632                            an error and try again next iteration
2633                         */                      
2634                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2635                         talloc_free(mem_ctx);
2636                         return MONITOR_FAILED;
2637                 }
2638
2639                 /* set up the callback functions */
2640                 state->async.fn = verify_recmode_normal_callback;
2641                 state->async.private_data = rmdata;
2642
2643                 /* one more control to wait for to complete */
2644                 rmdata->count++;
2645         }
2646
2647
2648         /* now wait for up to the maximum number of seconds allowed
2649            or until all nodes we expect a response from has replied
2650         */
2651         while (rmdata->count > 0) {
2652                 event_loop_once(ctdb->ev);
2653         }
2654
2655         status = rmdata->status;
2656         talloc_free(mem_ctx);
2657         return status;
2658 }
2659
2660
2661 struct verify_recmaster_data {
2662         struct ctdb_recoverd *rec;
2663         uint32_t count;
2664         uint32_t pnn;
2665         enum monitor_result status;
2666 };
2667
2668 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2669 {
2670         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2671
2672
2673         /* one more node has responded with recmaster data*/
2674         rmdata->count--;
2675
2676         /* if we failed to get the recmaster, then return an error and let
2677            the main loop try again.
2678         */
2679         if (state->state != CTDB_CONTROL_DONE) {
2680                 if (rmdata->status == MONITOR_OK) {
2681                         rmdata->status = MONITOR_FAILED;
2682                 }
2683                 return;
2684         }
2685
2686         /* if we got a response, then the recmaster will be stored in the
2687            status field
2688         */
2689         if (state->status != rmdata->pnn) {
2690                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2691                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2692                 rmdata->status = MONITOR_ELECTION_NEEDED;
2693         }
2694
2695         return;
2696 }
2697
2698
2699 /* verify that all nodes agree that we are the recmaster */
2700 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2701 {
2702         struct ctdb_context *ctdb = rec->ctdb;
2703         struct verify_recmaster_data *rmdata;
2704         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2705         struct ctdb_client_control_state *state;
2706         enum monitor_result status;
2707         int j;
2708         
2709         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2710         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2711         rmdata->rec    = rec;
2712         rmdata->count  = 0;
2713         rmdata->pnn    = pnn;
2714         rmdata->status = MONITOR_OK;
2715
2716         /* loop over all active nodes and send an async getrecmaster call to 
2717            them*/
2718         for (j=0; j<nodemap->num; j++) {
2719                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2720                         continue;
2721                 }
2722                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2723                                         CONTROL_TIMEOUT(),
2724                                         nodemap->nodes[j].pnn);
2725                 if (state == NULL) {
2726                         /* we failed to send the control, treat this as 
2727                            an error and try again next iteration
2728                         */                      
2729                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2730                         talloc_free(mem_ctx);
2731                         return MONITOR_FAILED;
2732                 }
2733
2734                 /* set up the callback functions */
2735                 state->async.fn = verify_recmaster_callback;
2736                 state->async.private_data = rmdata;
2737
2738                 /* one more control to wait for to complete */
2739                 rmdata->count++;
2740         }
2741
2742
2743         /* now wait for up to the maximum number of seconds allowed
2744            or until all nodes we expect a response from has replied
2745         */
2746         while (rmdata->count > 0) {
2747                 event_loop_once(ctdb->ev);
2748         }
2749
2750         status = rmdata->status;
2751         talloc_free(mem_ctx);
2752         return status;
2753 }
2754
2755
2756 /* called to check that the local allocation of public ip addresses is ok.
2757 */
2758 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2759 {
2760         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2761         struct ctdb_control_get_ifaces *ifaces = NULL;
2762         struct ctdb_all_public_ips *ips = NULL;
2763         struct ctdb_uptime *uptime1 = NULL;
2764         struct ctdb_uptime *uptime2 = NULL;
2765         int ret, j;
2766         bool need_iface_check = false;
2767         bool need_takeover_run = false;
2768
2769         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2770                                 CTDB_CURRENT_NODE, &uptime1);
2771         if (ret != 0) {
2772                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2773                 talloc_free(mem_ctx);
2774                 return -1;
2775         }
2776
2777
2778         /* read the interfaces from the local node */
2779         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2780         if (ret != 0) {
2781                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2782                 talloc_free(mem_ctx);
2783                 return -1;
2784         }
2785
2786         if (!rec->ifaces) {
2787                 need_iface_check = true;
2788         } else if (rec->ifaces->num != ifaces->num) {
2789                 need_iface_check = true;
2790         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2791                 need_iface_check = true;
2792         }
2793
2794         talloc_free(rec->ifaces);
2795         rec->ifaces = talloc_steal(rec, ifaces);
2796
2797         if (need_iface_check) {
2798                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2799                                      "local node %u - force takeover run\n",
2800                                      pnn));
2801                 need_takeover_run = true;
2802         }
2803
2804         /* read the ip allocation from the local node */
2805         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2806         if (ret != 0) {
2807                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2808                 talloc_free(mem_ctx);
2809                 return -1;
2810         }
2811
2812         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2813                                 CTDB_CURRENT_NODE, &uptime2);
2814         if (ret != 0) {
2815                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2816                 talloc_free(mem_ctx);
2817                 return -1;
2818         }
2819
2820         /* skip the check if the startrecovery time has changed */
2821         if (timeval_compare(&uptime1->last_recovery_started,
2822                             &uptime2->last_recovery_started) != 0) {
2823                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2824                 talloc_free(mem_ctx);
2825                 return 0;
2826         }
2827
2828         /* skip the check if the endrecovery time has changed */
2829         if (timeval_compare(&uptime1->last_recovery_finished,
2830                             &uptime2->last_recovery_finished) != 0) {
2831                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2832                 talloc_free(mem_ctx);
2833                 return 0;
2834         }
2835
2836         /* skip the check if we have started but not finished recovery */
2837         if (timeval_compare(&uptime1->last_recovery_finished,
2838                             &uptime1->last_recovery_started) != 1) {
2839                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2840                 talloc_free(mem_ctx);
2841
2842                 return 0;
2843         }
2844
2845         /* verify that we have the ip addresses we should have
2846            and we dont have ones we shouldnt have.
2847            if we find an inconsistency we set recmode to
2848            active on the local node and wait for the recmaster
2849            to do a full blown recovery.
2850            also if the pnn is -1 and we are healthy and can host the ip
2851            we also request a ip reallocation.
2852         */
2853         if (ctdb->tunable.disable_ip_failover == 0) {
2854                 for (j=0; j<ips->num; j++) {
2855                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2856                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2857                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2858                                 need_takeover_run = true;
2859                         } else if (ips->ips[j].pnn == pnn) {
2860                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
2861                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2862                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2863                                         need_takeover_run = true;
2864                                 }
2865                         } else {
2866                                 if (ctdb->do_checkpublicip && ctdb_sys_have_ip(&ips->ips[j].addr)) {
2867
2868                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving. Removing it.\n", 
2869                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2870
2871                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
2872                                                 DEBUG(DEBUG_ERR,("Failed to release local ip address\n"));
2873                                         }
2874                                 }
2875                         }
2876                 }
2877         }
2878
2879         if (need_takeover_run) {
2880                 struct takeover_run_reply rd;
2881                 TDB_DATA data;
2882
2883                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2884
2885                 rd.pnn = ctdb->pnn;
2886                 rd.srvid = 0;
2887                 data.dptr = (uint8_t *)&rd;
2888                 data.dsize = sizeof(rd);
2889
2890                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2891                 if (ret != 0) {
2892                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2893                 }
2894         }
2895         talloc_free(mem_ctx);
2896         return 0;
2897 }
2898
2899
2900 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2901 {
2902         struct ctdb_node_map **remote_nodemaps = callback_data;
2903
2904         if (node_pnn >= ctdb->num_nodes) {
2905                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2906                 return;
2907         }
2908
2909         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2910
2911 }
2912
2913 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2914         struct ctdb_node_map *nodemap,
2915         struct ctdb_node_map **remote_nodemaps)
2916 {
2917         uint32_t *nodes;
2918
2919         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2920         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2921                                         nodes, 0,
2922                                         CONTROL_TIMEOUT(), false, tdb_null,
2923                                         async_getnodemap_callback,
2924                                         NULL,
2925                                         remote_nodemaps) != 0) {
2926                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2927
2928                 return -1;
2929         }
2930
2931         return 0;
2932 }
2933
2934 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2935 struct ctdb_check_reclock_state {
2936         struct ctdb_context *ctdb;
2937         struct timeval start_time;
2938         int fd[2];
2939         pid_t child;
2940         struct timed_event *te;
2941         struct fd_event *fde;
2942         enum reclock_child_status status;
2943 };
2944
2945 /* when we free the reclock state we must kill any child process.
2946 */
2947 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2948 {
2949         struct ctdb_context *ctdb = state->ctdb;
2950
2951         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2952
2953         if (state->fd[0] != -1) {
2954                 close(state->fd[0]);
2955                 state->fd[0] = -1;
2956         }
2957         if (state->fd[1] != -1) {
2958                 close(state->fd[1]);
2959                 state->fd[1] = -1;
2960         }
2961         ctdb_kill(ctdb, state->child, SIGKILL);
2962         return 0;
2963 }
2964
2965 /*
2966   called if our check_reclock child times out. this would happen if
2967   i/o to the reclock file blocks.
2968  */
2969 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2970                                          struct timeval t, void *private_data)
2971 {
2972         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2973                                            struct ctdb_check_reclock_state);
2974
2975         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2976         state->status = RECLOCK_TIMEOUT;
2977 }
2978
2979 /* this is called when the child process has completed checking the reclock
2980    file and has written data back to us through the pipe.
2981 */
2982 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2983                              uint16_t flags, void *private_data)
2984 {
2985         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2986                                              struct ctdb_check_reclock_state);
2987         char c = 0;
2988         int ret;
2989
2990         /* we got a response from our child process so we can abort the
2991            timeout.
2992         */
2993         talloc_free(state->te);
2994         state->te = NULL;
2995
2996         ret = read(state->fd[0], &c, 1);
2997         if (ret != 1 || c != RECLOCK_OK) {
2998                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2999                 state->status = RECLOCK_FAILED;
3000
3001                 return;
3002         }
3003
3004         state->status = RECLOCK_OK;
3005         return;
3006 }
3007
3008 static int check_recovery_lock(struct ctdb_context *ctdb)
3009 {
3010         int ret;
3011         struct ctdb_check_reclock_state *state;
3012         pid_t parent = getpid();
3013
3014         if (ctdb->recovery_lock_fd == -1) {
3015                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3016                 return -1;
3017         }
3018
3019         state = talloc(ctdb, struct ctdb_check_reclock_state);
3020         CTDB_NO_MEMORY(ctdb, state);
3021
3022         state->ctdb = ctdb;
3023         state->start_time = timeval_current();
3024         state->status = RECLOCK_CHECKING;
3025         state->fd[0] = -1;
3026         state->fd[1] = -1;
3027
3028         ret = pipe(state->fd);
3029         if (ret != 0) {
3030                 talloc_free(state);
3031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3032                 return -1;
3033         }
3034
3035         state->child = ctdb_fork(ctdb);
3036         if (state->child == (pid_t)-1) {
3037                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3038                 close(state->fd[0]);
3039                 state->fd[0] = -1;
3040                 close(state->fd[1]);
3041                 state->fd[1] = -1;
3042                 talloc_free(state);
3043                 return -1;
3044         }
3045
3046         if (state->child == 0) {
3047                 char cc = RECLOCK_OK;
3048                 close(state->fd[0]);
3049                 state->fd[0] = -1;
3050
3051                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3052                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3053                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3054                         cc = RECLOCK_FAILED;
3055                 }
3056
3057                 write(state->fd[1], &cc, 1);
3058                 /* make sure we die when our parent dies */
3059                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3060                         sleep(5);
3061                         write(state->fd[1], &cc, 1);
3062                 }
3063                 _exit(0);
3064         }
3065         close(state->fd[1]);
3066         state->fd[1] = -1;
3067         set_close_on_exec(state->fd[0]);
3068
3069         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3070
3071         talloc_set_destructor(state, check_reclock_destructor);
3072
3073         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3074                                     ctdb_check_reclock_timeout, state);
3075         if (state->te == NULL) {
3076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3077                 talloc_free(state);
3078                 return -1;
3079         }
3080
3081         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3082                                 EVENT_FD_READ,
3083                                 reclock_child_handler,
3084                                 (void *)state);
3085
3086         if (state->fde == NULL) {
3087                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3088                 talloc_free(state);
3089                 return -1;
3090         }
3091         tevent_fd_set_auto_close(state->fde);
3092
3093         while (state->status == RECLOCK_CHECKING) {
3094                 event_loop_once(ctdb->ev);
3095         }
3096
3097         if (state->status == RECLOCK_FAILED) {
3098                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3099                 close(ctdb->recovery_lock_fd);
3100                 ctdb->recovery_lock_fd = -1;
3101                 talloc_free(state);
3102                 return -1;
3103         }
3104
3105         talloc_free(state);
3106         return 0;
3107 }
3108
3109 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3110 {
3111         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3112         const char *reclockfile;
3113
3114         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3115                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3116                 talloc_free(tmp_ctx);
3117                 return -1;      
3118         }
3119
3120         if (reclockfile == NULL) {
3121                 if (ctdb->recovery_lock_file != NULL) {
3122                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3123                         talloc_free(ctdb->recovery_lock_file);
3124                         ctdb->recovery_lock_file = NULL;
3125                         if (ctdb->recovery_lock_fd != -1) {
3126                                 close(ctdb->recovery_lock_fd);
3127                                 ctdb->recovery_lock_fd = -1;
3128                         }
3129                 }
3130                 ctdb->tunable.verify_recovery_lock = 0;
3131                 talloc_free(tmp_ctx);
3132                 return 0;
3133         }
3134
3135         if (ctdb->recovery_lock_file == NULL) {
3136                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3137                 if (ctdb->recovery_lock_fd != -1) {
3138                         close(ctdb->recovery_lock_fd);
3139                         ctdb->recovery_lock_fd = -1;
3140                 }
3141                 talloc_free(tmp_ctx);
3142                 return 0;
3143         }
3144
3145
3146         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3147                 talloc_free(tmp_ctx);
3148                 return 0;
3149         }
3150
3151         talloc_free(ctdb->recovery_lock_file);
3152         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3153         ctdb->tunable.verify_recovery_lock = 0;
3154         if (ctdb->recovery_lock_fd != -1) {
3155                 close(ctdb->recovery_lock_fd);
3156                 ctdb->recovery_lock_fd = -1;
3157         }
3158
3159         talloc_free(tmp_ctx);
3160         return 0;
3161 }
3162
3163 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3164                       TALLOC_CTX *mem_ctx)
3165 {
3166         uint32_t pnn;
3167         struct ctdb_node_map *nodemap=NULL;
3168         struct ctdb_node_map *recmaster_nodemap=NULL;
3169         struct ctdb_node_map **remote_nodemaps=NULL;
3170         struct ctdb_vnn_map *vnnmap=NULL;
3171         struct ctdb_vnn_map *remote_vnnmap=NULL;
3172         int32_t debug_level;
3173         int i, j, ret;
3174
3175
3176
3177         /* verify that the main daemon is still running */
3178         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3179                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3180                 exit(-1);
3181         }
3182
3183         /* ping the local daemon to tell it we are alive */
3184         ctdb_ctrl_recd_ping(ctdb);
3185
3186         if (rec->election_timeout) {
3187                 /* an election is in progress */
3188                 return;
3189         }
3190
3191         /* read the debug level from the parent and update locally */
3192         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3193         if (ret !=0) {
3194                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3195                 return;
3196         }
3197         LogLevel = debug_level;
3198
3199
3200         /* We must check if we need to ban a node here but we want to do this
3201            as early as possible so we dont wait until we have pulled the node
3202            map from the local node. thats why we have the hardcoded value 20
3203         */
3204         for (i=0; i<ctdb->num_nodes; i++) {
3205                 struct ctdb_banning_state *ban_state;
3206
3207                 if (ctdb->nodes[i]->ban_state == NULL) {
3208                         continue;
3209                 }
3210                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3211                 if (ban_state->count < 20) {
3212                         continue;
3213                 }
3214                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3215                         ctdb->nodes[i]->pnn, ban_state->count,
3216                         ctdb->tunable.recovery_ban_period));
3217                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3218                 ban_state->count = 0;
3219         }
3220
3221         /* get relevant tunables */
3222         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3223         if (ret != 0) {
3224                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3225                 return;
3226         }
3227
3228         /* get the current recovery lock file from the server */
3229         if (update_recovery_lock_file(ctdb) != 0) {
3230                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3231                 return;
3232         }
3233
3234         /* Make sure that if recovery lock verification becomes disabled when
3235            we close the file
3236         */
3237         if (ctdb->tunable.verify_recovery_lock == 0) {
3238                 if (ctdb->recovery_lock_fd != -1) {
3239                         close(ctdb->recovery_lock_fd);
3240                         ctdb->recovery_lock_fd = -1;
3241                 }
3242         }
3243
3244         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3245         if (pnn == (uint32_t)-1) {
3246                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3247                 return;
3248         }
3249
3250         /* get the vnnmap */
3251         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3252         if (ret != 0) {
3253                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3254                 return;
3255         }
3256
3257
3258         /* get number of nodes */
3259         if (rec->nodemap) {
3260                 talloc_free(rec->nodemap);
3261                 rec->nodemap = NULL;
3262                 nodemap=NULL;
3263         }
3264         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3265         if (ret != 0) {
3266                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3267                 return;
3268         }
3269         nodemap = rec->nodemap;
3270
3271         /* update the capabilities for all nodes */
3272         ret = update_capabilities(ctdb, nodemap);
3273         if (ret != 0) {
3274                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3275                 return;
3276         }
3277
3278         /* check which node is the recovery master */
3279         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3280         if (ret != 0) {
3281                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3282                 return;
3283         }
3284
3285         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3286         if (rec->recmaster != pnn) {
3287                 if (rec->ip_reallocate_ctx != NULL) {
3288                         talloc_free(rec->ip_reallocate_ctx);
3289                         rec->ip_reallocate_ctx = NULL;
3290                         rec->reallocate_callers = NULL;
3291                 }
3292         }
3293
3294         if (rec->recmaster == (uint32_t)-1) {
3295                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3296                 force_election(rec, pnn, nodemap);
3297                 return;
3298         }
3299
3300         /* if the local daemon is STOPPED, we verify that the databases are
3301            also frozen and thet the recmode is set to active 
3302         */
3303         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3304                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3305                 if (ret != 0) {
3306                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3307                 }
3308                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3309                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3310
3311                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3312                         if (ret != 0) {
3313                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3314                                 return;
3315                         }
3316                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3317                         if (ret != 0) {
3318                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3319
3320                                 return;
3321                         }
3322                         return;
3323                 }
3324         }
3325         /* If the local node is stopped, verify we are not the recmaster 
3326            and yield this role if so
3327         */
3328         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3329                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3330                 force_election(rec, pnn, nodemap);
3331                 return;
3332         }
3333         
3334         /*
3335          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3336          * but we have force an election and try to become the new
3337          * recmaster
3338          */
3339         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3340             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3341              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3342                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3343                                   " but we (node %u) have - force an election\n",
3344                                   rec->recmaster, pnn));
3345                 force_election(rec, pnn, nodemap);
3346                 return;
3347         }
3348
3349         /* check that we (recovery daemon) and the local ctdb daemon
3350            agrees on whether we are banned or not
3351         */
3352 //qqq
3353
3354         /* remember our own node flags */
3355         rec->node_flags = nodemap->nodes[pnn].flags;
3356
3357         /* count how many active nodes there are */
3358         rec->num_active    = 0;
3359         rec->num_connected = 0;
3360         for (i=0; i<nodemap->num; i++) {
3361                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3362                         rec->num_active++;
3363                 }
3364                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3365                         rec->num_connected++;
3366                 }
3367         }
3368
3369
3370         /* verify that the recmaster node is still active */
3371         for (j=0; j<nodemap->num; j++) {
3372                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3373                         break;
3374                 }
3375         }
3376
3377         if (j == nodemap->num) {
3378                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3379                 force_election(rec, pnn, nodemap);
3380                 return;
3381         }
3382
3383         /* if recovery master is disconnected we must elect a new recmaster */
3384         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3385                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3386                 force_election(rec, pnn, nodemap);
3387                 return;
3388         }
3389
3390         /* grap the nodemap from the recovery master to check if it is banned */
3391         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3392                                    mem_ctx, &recmaster_nodemap);
3393         if (ret != 0) {
3394                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3395                           nodemap->nodes[j].pnn));
3396                 return;
3397         }
3398
3399
3400         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3401                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3402                 force_election(rec, pnn, nodemap);
3403                 return;
3404         }
3405
3406         /* If this node is stopped then it is not the recovery master
3407          * so the only remaining action is to potentially to verify
3408          * the local IP allocation below.  This won't accomplish
3409          * anything useful so skip it.
3410          */
3411         if (rec->node_flags & NODE_FLAGS_STOPPED) {
3412                 return;
3413         }
3414
3415         /* verify that we have all ip addresses we should have and we dont
3416          * have addresses we shouldnt have.
3417          */ 
3418         if (ctdb->tunable.disable_ip_failover == 0) {
3419                 if (rec->ip_check_disable_ctx == NULL) {
3420                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3421                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3422                         }
3423                 }
3424         }
3425
3426
3427         /* if we are not the recmaster then we do not need to check
3428            if recovery is needed
3429          */
3430         if (pnn != rec->recmaster) {
3431                 return;
3432         }
3433
3434
3435         /* ensure our local copies of flags are right */
3436         ret = update_local_flags(rec, nodemap);
3437         if (ret == MONITOR_ELECTION_NEEDED) {
3438                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3439                 force_election(rec, pnn, nodemap);
3440                 return;
3441         }
3442         if (ret != MONITOR_OK) {
3443                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3444                 return;
3445         }
3446
3447         if (ctdb->num_nodes != nodemap->num) {
3448                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3449                 reload_nodes_file(ctdb);
3450                 return;
3451         }
3452
3453         /* verify that all active nodes agree that we are the recmaster */
3454         switch (verify_recmaster(rec, nodemap, pnn)) {
3455         case MONITOR_RECOVERY_NEEDED:
3456                 /* can not happen */
3457                 return;
3458         case MONITOR_ELECTION_NEEDED:
3459                 force_election(rec, pnn, nodemap);
3460                 return;
3461         case MONITOR_OK:
3462                 break;
3463         case MONITOR_FAILED:
3464                 return;
3465         }
3466
3467
3468         if (rec->need_recovery) {
3469                 /* a previous recovery didn't finish */
3470                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3471                 return;
3472         }
3473
3474         /* verify that all active nodes are in normal mode 
3475            and not in recovery mode 
3476         */
3477         switch (verify_recmode(ctdb, nodemap)) {
3478         case MONITOR_RECOVERY_NEEDED:
3479                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3480                 return;
3481         case MONITOR_FAILED:
3482                 return;
3483         case MONITOR_ELECTION_NEEDED:
3484                 /* can not happen */
3485         case MONITOR_OK:
3486                 break;
3487         }
3488
3489
3490         if (ctdb->tunable.verify_recovery_lock != 0) {
3491                 /* we should have the reclock - check its not stale */
3492                 ret = check_recovery_lock(ctdb);
3493                 if (ret != 0) {
3494                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3495                         ctdb_set_culprit(rec, ctdb->pnn);
3496                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3497                         return;
3498                 }
3499         }
3500
3501
3502         /* is there a pending reload all ips ? */
3503         if (reload_all_ips_request != NULL) {
3504                 reload_all_ips(ctdb, rec, nodemap, reload_all_ips_request);
3505                 talloc_free(reload_all_ips_request);
3506                 reload_all_ips_request = NULL;
3507         }
3508
3509         /* if there are takeovers requested, perform it and notify the waiters */
3510         if (rec->reallocate_callers) {
3511                 process_ipreallocate_requests(ctdb, rec);
3512         }
3513
3514         /* get the nodemap for all active remote nodes
3515          */
3516         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3517         if (remote_nodemaps == NULL) {
3518                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3519                 return;
3520         }
3521         for(i=0; i<nodemap->num; i++) {
3522                 remote_nodemaps[i] = NULL;
3523         }
3524         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3525                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3526                 return;
3527         } 
3528
3529         /* verify that all other nodes have the same nodemap as we have
3530         */
3531         for (j=0; j<nodemap->num; j++) {
3532                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3533                         continue;
3534                 }
3535
3536                 if (remote_nodemaps[j] == NULL) {
3537                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3538                         ctdb_set_culprit(rec, j);
3539
3540                         return;
3541                 }
3542
3543                 /* if the nodes disagree on how many nodes there are
3544                    then this is a good reason to try recovery
3545                  */
3546                 if (remote_nodemaps[j]->num != nodemap->num) {
3547                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3548                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3549                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3550                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3551                         return;
3552                 }
3553
3554                 /* if the nodes disagree on which nodes exist and are
3555                    active, then that is also a good reason to do recovery
3556                  */
3557                 for (i=0;i<nodemap->num;i++) {
3558                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3559                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3560                                           nodemap->nodes[j].pnn, i, 
3561                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3562                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3563                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3564                                             vnnmap);
3565                                 return;
3566                         }
3567                 }
3568
3569                 /* verify the flags are consistent
3570                 */
3571                 for (i=0; i<nodemap->num; i++) {
3572                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3573                                 continue;
3574                         }
3575                         
3576                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3577                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3578                                   nodemap->nodes[j].pnn, 
3579                                   nodemap->nodes[i].pnn, 
3580                                   remote_nodemaps[j]->nodes[i].flags,
3581                                   nodemap->nodes[j].flags));
3582                                 if (i == j) {
3583                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3584                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3585                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3586                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3587                                                     vnnmap);
3588                                         return;
3589                                 } else {
3590                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3591                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3592                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3593                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3594                                                     vnnmap);
3595                                         return;
3596                                 }
3597                         }
3598                 }
3599         }
3600
3601
3602         /* there better be the same number of lmasters in the vnn map
3603            as there are active nodes or we will have to do a recovery
3604          */
3605         if (vnnmap->size != rec->num_active) {
3606                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3607                           vnnmap->size, rec->num_active));
3608                 ctdb_set_culprit(rec, ctdb->pnn);
3609                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3610                 return;
3611         }
3612
3613         /* verify that all active nodes in the nodemap also exist in 
3614            the vnnmap.
3615          */
3616         for (j=0; j<nodemap->num; j++) {
3617                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3618                         continue;
3619                 }
3620                 if (nodemap->nodes[j].pnn == pnn) {
3621                         continue;
3622                 }
3623
3624                 for (i=0; i<vnnmap->size; i++) {
3625                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3626                                 break;
3627                         }
3628                 }
3629                 if (i == vnnmap->size) {
3630                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3631                                   nodemap->nodes[j].pnn));
3632                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3633                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3634                         return;
3635                 }
3636         }
3637
3638         
3639         /* verify that all other nodes have the same vnnmap
3640            and are from the same generation
3641          */
3642         for (j=0; j<nodemap->num; j++) {
3643                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3644                         continue;
3645                 }
3646                 if (nodemap->nodes[j].pnn == pnn) {
3647                         continue;
3648                 }
3649
3650                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3651                                           mem_ctx, &remote_vnnmap);
3652                 if (ret != 0) {
3653                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3654                                   nodemap->nodes[j].pnn));
3655                         return;
3656                 }
3657
3658                 /* verify the vnnmap generation is the same */
3659                 if (vnnmap->generation != remote_vnnmap->generation) {
3660                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3661                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3662                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3663                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3664                         return;
3665                 }
3666
3667                 /* verify the vnnmap size is the same */
3668                 if (vnnmap->size != remote_vnnmap->size) {
3669                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3670                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3671                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3672                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3673                         return;
3674                 }
3675
3676                 /* verify the vnnmap is the same */
3677                 for (i=0;i<vnnmap->size;i++) {
3678                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3679                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3680                                           nodemap->nodes[j].pnn));
3681                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3682                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3683                                             vnnmap);
3684                                 return;
3685                         }
3686                 }
3687         }
3688
3689         /* we might need to change who has what IP assigned */
3690         if (rec->need_takeover_run) {
3691                 uint32_t culprit = (uint32_t)-1;
3692
3693                 rec->need_takeover_run = false;
3694
3695                 /* update the list of public ips that a node can handle for
3696                    all connected nodes
3697                 */
3698                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3699                 if (ret != 0) {
3700                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3701                                          culprit));
3702                         rec->need_takeover_run = true;
3703                         return;
3704                 }
3705
3706                 /* execute the "startrecovery" event script on all nodes */
3707                 ret = run_startrecovery_eventscript(rec, nodemap);
3708                 if (ret!=0) {
3709                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3710                         ctdb_set_culprit(rec, ctdb->pnn);
3711                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3712                         return;
3713                 }
3714
3715                 ret = ctdb_takeover_run(ctdb, nodemap);
3716                 if (ret != 0) {
3717                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3718                         return;
3719                 }
3720
3721                 /* execute the "recovered" event script on all nodes */
3722                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3723 #if 0
3724 // we cant check whether the event completed successfully
3725 // since this script WILL fail if the node is in recovery mode
3726 // and if that race happens, the code here would just cause a second
3727 // cascading recovery.
3728                 if (ret!=0) {
3729                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3730                         ctdb_set_culprit(rec, ctdb->pnn);
3731                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3732                 }
3733 #endif
3734         }
3735 }
3736
3737 /*
3738   the main monitoring loop
3739  */
3740 static void monitor_cluster(struct ctdb_context *ctdb)
3741 {
3742         struct ctdb_recoverd *rec;
3743
3744         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3745
3746         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3747         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3748
3749         rec->ctdb = ctdb;
3750
3751         rec->priority_time = timeval_current();
3752
3753         /* register a message port for sending memory dumps */
3754         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3755
3756         /* register a message port for recovery elections */
3757         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3758
3759         /* when nodes are disabled/enabled */
3760         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3761
3762         /* when we are asked to puch out a flag change */
3763         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3764
3765         /* register a message port for vacuum fetch */
3766         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3767
3768         /* register a message port for reloadnodes  */
3769         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3770
3771         /* register a message port for performing a takeover run */
3772         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3773
3774         /* register a message port for performing a reload all ips */
3775         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_ALL_IPS, ip_reloadall_handler, rec);
3776
3777         /* register a message port for disabling the ip check for a short while */
3778         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3779
3780         /* register a message port for updating the recovery daemons node assignment for an ip */
3781         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3782
3783         /* register a message port for forcing a rebalance of a node next
3784            reallocation */
3785         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3786
3787         for (;;) {
3788                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3789                 struct timeval start;
3790                 double elapsed;
3791
3792                 if (!mem_ctx) {
3793                         DEBUG(DEBUG_CRIT,(__location__
3794                                           " Failed to create temp context\n"));
3795                         exit(-1);
3796                 }
3797
3798                 start = timeval_current();
3799                 main_loop(ctdb, rec, mem_ctx);
3800                 talloc_free(mem_ctx);
3801
3802                 /* we only check for recovery once every second */
3803                 elapsed = timeval_elapsed(&start);
3804                 if (elapsed < ctdb->tunable.recover_interval) {
3805                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3806                                           - elapsed);
3807                 }
3808         }
3809 }
3810
3811 /*
3812   event handler for when the main ctdbd dies
3813  */
3814 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3815                                  uint16_t flags, void *private_data)
3816 {
3817         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3818         _exit(1);
3819 }
3820
3821 /*
3822   called regularly to verify that the recovery daemon is still running
3823  */
3824 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3825                               struct timeval yt, void *p)
3826 {
3827         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3828
3829         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
3830                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3831
3832                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3833                                 ctdb_restart_recd, ctdb);
3834
3835                 return;
3836         }
3837
3838         event_add_timed(ctdb->ev, ctdb, 
3839                         timeval_current_ofs(30, 0),
3840                         ctdb_check_recd, ctdb);
3841 }
3842
3843 static void recd_sig_child_handler(struct event_context *ev,
3844         struct signal_event *se, int signum, int count,
3845         void *dont_care, 
3846         void *private_data)
3847 {
3848 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3849         int status;
3850         pid_t pid = -1;
3851
3852         while (pid != 0) {
3853                 pid = waitpid(-1, &status, WNOHANG);
3854                 if (pid == -1) {
3855                         if (errno != ECHILD) {
3856                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3857                         }
3858                         return;
3859                 }
3860                 if (pid > 0) {
3861                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3862                 }
3863         }
3864 }
3865
3866 /*
3867   startup the recovery daemon as a child of the main ctdb daemon
3868  */
3869 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3870 {
3871         int fd[2];
3872         struct signal_event *se;
3873         struct tevent_fd *fde;
3874
3875         if (pipe(fd) != 0) {
3876                 return -1;
3877         }
3878
3879         ctdb->ctdbd_pid = getpid();
3880
3881         ctdb->recoverd_pid = ctdb_fork(ctdb);
3882         if (ctdb->recoverd_pid == -1) {
3883                 return -1;
3884         }
3885         
3886         if (ctdb->recoverd_pid != 0) {
3887                 close(fd[0]);
3888                 event_add_timed(ctdb->ev, ctdb, 
3889                                 timeval_current_ofs(30, 0),
3890                                 ctdb_check_recd, ctdb);
3891                 return 0;
3892         }
3893
3894         close(fd[1]);
3895
3896         srandom(getpid() ^ time(NULL));
3897
3898         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3899                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3900                 exit(1);
3901         }
3902
3903         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3904
3905         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3906                      ctdb_recoverd_parent, &fd[0]);     
3907         tevent_fd_set_auto_close(fde);
3908
3909         /* set up a handler to pick up sigchld */
3910         se = event_add_signal(ctdb->ev, ctdb,
3911                                      SIGCHLD, 0,
3912                                      recd_sig_child_handler,
3913                                      ctdb);
3914         if (se == NULL) {
3915                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3916                 exit(1);
3917         }
3918
3919         monitor_cluster(ctdb);
3920
3921         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3922         return -1;
3923 }
3924
3925 /*
3926   shutdown the recovery daemon
3927  */
3928 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3929 {
3930         if (ctdb->recoverd_pid == 0) {
3931                 return;
3932         }
3933
3934         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3935         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
3936 }
3937
3938 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3939                        struct timeval t, void *private_data)
3940 {
3941         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3942
3943         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3944         ctdb_stop_recoverd(ctdb);
3945         ctdb_start_recoverd(ctdb);
3946 }