Merge remote branch 'martins/policy_routing'
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/tevent/tevent.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68         TALLOC_CTX *deferred_rebalance_ctx;
69 };
70
71 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
72 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
73
74 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
75
76 /*
77   ban a node for a period of time
78  */
79 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
80 {
81         int ret;
82         struct ctdb_context *ctdb = rec->ctdb;
83         struct ctdb_ban_time bantime;
84        
85         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
86
87         if (!ctdb_validate_pnn(ctdb, pnn)) {
88                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
89                 return;
90         }
91
92         bantime.pnn  = pnn;
93         bantime.time = ban_time;
94
95         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
96         if (ret != 0) {
97                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
98                 return;
99         }
100
101 }
102
103 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
104
105
106 /*
107   run the "recovered" eventscript on all nodes
108  */
109 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
110 {
111         TALLOC_CTX *tmp_ctx;
112         uint32_t *nodes;
113
114         tmp_ctx = talloc_new(ctdb);
115         CTDB_NO_MEMORY(ctdb, tmp_ctx);
116
117         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
118         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
119                                         nodes, 0,
120                                         CONTROL_TIMEOUT(), false, tdb_null,
121                                         NULL, NULL,
122                                         NULL) != 0) {
123                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
124
125                 talloc_free(tmp_ctx);
126                 return -1;
127         }
128
129         talloc_free(tmp_ctx);
130         return 0;
131 }
132
133 /*
134   remember the trouble maker
135  */
136 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
137 {
138         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
139         struct ctdb_banning_state *ban_state;
140
141         if (culprit > ctdb->num_nodes) {
142                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
143                 return;
144         }
145
146         if (ctdb->nodes[culprit]->ban_state == NULL) {
147                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
148                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
149
150                 
151         }
152         ban_state = ctdb->nodes[culprit]->ban_state;
153         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
154                 /* this was the first time in a long while this node
155                    misbehaved so we will forgive any old transgressions.
156                 */
157                 ban_state->count = 0;
158         }
159
160         ban_state->count += count;
161         ban_state->last_reported_time = timeval_current();
162         rec->last_culprit_node = culprit;
163 }
164
165 /*
166   remember the trouble maker
167  */
168 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
169 {
170         ctdb_set_culprit_count(rec, culprit, 1);
171 }
172
173
174 /* this callback is called for every node that failed to execute the
175    start recovery event
176 */
177 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
178 {
179         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
180
181         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
182
183         ctdb_set_culprit(rec, node_pnn);
184 }
185
186 /*
187   run the "startrecovery" eventscript on all nodes
188  */
189 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
190 {
191         TALLOC_CTX *tmp_ctx;
192         uint32_t *nodes;
193         struct ctdb_context *ctdb = rec->ctdb;
194
195         tmp_ctx = talloc_new(ctdb);
196         CTDB_NO_MEMORY(ctdb, tmp_ctx);
197
198         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
199         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
200                                         nodes, 0,
201                                         CONTROL_TIMEOUT(), false, tdb_null,
202                                         NULL,
203                                         startrecovery_fail_callback,
204                                         rec) != 0) {
205                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
206                 talloc_free(tmp_ctx);
207                 return -1;
208         }
209
210         talloc_free(tmp_ctx);
211         return 0;
212 }
213
214 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
215 {
216         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
217                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
218                 return;
219         }
220         if (node_pnn < ctdb->num_nodes) {
221                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
222         }
223
224         if (node_pnn == ctdb->pnn) {
225                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
226         }
227 }
228
229 /*
230   update the node capabilities for all connected nodes
231  */
232 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
233 {
234         uint32_t *nodes;
235         TALLOC_CTX *tmp_ctx;
236
237         tmp_ctx = talloc_new(ctdb);
238         CTDB_NO_MEMORY(ctdb, tmp_ctx);
239
240         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
241         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
242                                         nodes, 0,
243                                         CONTROL_TIMEOUT(),
244                                         false, tdb_null,
245                                         async_getcap_callback, NULL,
246                                         NULL) != 0) {
247                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
248                 talloc_free(tmp_ctx);
249                 return -1;
250         }
251
252         talloc_free(tmp_ctx);
253         return 0;
254 }
255
256 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
257 {
258         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
259
260         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
261         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
262 }
263
264 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
265 {
266         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
267
268         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
269         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
270 }
271
272 /*
273   change recovery mode on all nodes
274  */
275 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
276 {
277         TDB_DATA data;
278         uint32_t *nodes;
279         TALLOC_CTX *tmp_ctx;
280
281         tmp_ctx = talloc_new(ctdb);
282         CTDB_NO_MEMORY(ctdb, tmp_ctx);
283
284         /* freeze all nodes */
285         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
286         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
287                 int i;
288
289                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
290                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
291                                                 nodes, i,
292                                                 CONTROL_TIMEOUT(),
293                                                 false, tdb_null,
294                                                 NULL,
295                                                 set_recmode_fail_callback,
296                                                 rec) != 0) {
297                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
298                                 talloc_free(tmp_ctx);
299                                 return -1;
300                         }
301                 }
302         }
303
304
305         data.dsize = sizeof(uint32_t);
306         data.dptr = (unsigned char *)&rec_mode;
307
308         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
309                                         nodes, 0,
310                                         CONTROL_TIMEOUT(),
311                                         false, data,
312                                         NULL, NULL,
313                                         NULL) != 0) {
314                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
315                 talloc_free(tmp_ctx);
316                 return -1;
317         }
318
319         talloc_free(tmp_ctx);
320         return 0;
321 }
322
323 /*
324   change recovery master on all node
325  */
326 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
327 {
328         TDB_DATA data;
329         TALLOC_CTX *tmp_ctx;
330         uint32_t *nodes;
331
332         tmp_ctx = talloc_new(ctdb);
333         CTDB_NO_MEMORY(ctdb, tmp_ctx);
334
335         data.dsize = sizeof(uint32_t);
336         data.dptr = (unsigned char *)&pnn;
337
338         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(), false, data,
342                                         NULL, NULL,
343                                         NULL) != 0) {
344                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
345                 talloc_free(tmp_ctx);
346                 return -1;
347         }
348
349         talloc_free(tmp_ctx);
350         return 0;
351 }
352
353 /* update all remote nodes to use the same db priority that we have
354    this can fail if the remove node has not yet been upgraded to 
355    support this function, so we always return success and never fail
356    a recovery if this call fails.
357 */
358 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
359         struct ctdb_node_map *nodemap, 
360         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
361 {
362         int db;
363         uint32_t *nodes;
364
365         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
366
367         /* step through all local databases */
368         for (db=0; db<dbmap->num;db++) {
369                 TDB_DATA data;
370                 struct ctdb_db_priority db_prio;
371                 int ret;
372
373                 db_prio.db_id     = dbmap->dbs[db].dbid;
374                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
375                 if (ret != 0) {
376                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
377                         continue;
378                 }
379
380                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
381
382                 data.dptr  = (uint8_t *)&db_prio;
383                 data.dsize = sizeof(db_prio);
384
385                 if (ctdb_client_async_control(ctdb,
386                                         CTDB_CONTROL_SET_DB_PRIORITY,
387                                         nodes, 0,
388                                         CONTROL_TIMEOUT(), false, data,
389                                         NULL, NULL,
390                                         NULL) != 0) {
391                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
392                 }
393         }
394
395         return 0;
396 }                       
397
398 /*
399   ensure all other nodes have attached to any databases that we have
400  */
401 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
402                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
403 {
404         int i, j, db, ret;
405         struct ctdb_dbid_map *remote_dbmap;
406
407         /* verify that all other nodes have all our databases */
408         for (j=0; j<nodemap->num; j++) {
409                 /* we dont need to ourself ourselves */
410                 if (nodemap->nodes[j].pnn == pnn) {
411                         continue;
412                 }
413                 /* dont check nodes that are unavailable */
414                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
415                         continue;
416                 }
417
418                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
419                                          mem_ctx, &remote_dbmap);
420                 if (ret != 0) {
421                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
422                         return -1;
423                 }
424
425                 /* step through all local databases */
426                 for (db=0; db<dbmap->num;db++) {
427                         const char *name;
428
429
430                         for (i=0;i<remote_dbmap->num;i++) {
431                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
432                                         break;
433                                 }
434                         }
435                         /* the remote node already have this database */
436                         if (i!=remote_dbmap->num) {
437                                 continue;
438                         }
439                         /* ok so we need to create this database */
440                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
441                                             mem_ctx, &name);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
444                                 return -1;
445                         }
446                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
447                                            mem_ctx, name,
448                                            dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
449                         if (ret != 0) {
450                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
451                                 return -1;
452                         }
453                 }
454         }
455
456         return 0;
457 }
458
459
460 /*
461   ensure we are attached to any databases that anyone else is attached to
462  */
463 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
464                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
465 {
466         int i, j, db, ret;
467         struct ctdb_dbid_map *remote_dbmap;
468
469         /* verify that we have all database any other node has */
470         for (j=0; j<nodemap->num; j++) {
471                 /* we dont need to ourself ourselves */
472                 if (nodemap->nodes[j].pnn == pnn) {
473                         continue;
474                 }
475                 /* dont check nodes that are unavailable */
476                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
477                         continue;
478                 }
479
480                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
481                                          mem_ctx, &remote_dbmap);
482                 if (ret != 0) {
483                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
484                         return -1;
485                 }
486
487                 /* step through all databases on the remote node */
488                 for (db=0; db<remote_dbmap->num;db++) {
489                         const char *name;
490
491                         for (i=0;i<(*dbmap)->num;i++) {
492                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
493                                         break;
494                                 }
495                         }
496                         /* we already have this db locally */
497                         if (i!=(*dbmap)->num) {
498                                 continue;
499                         }
500                         /* ok so we need to create this database and
501                            rebuild dbmap
502                          */
503                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
504                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
507                                           nodemap->nodes[j].pnn));
508                                 return -1;
509                         }
510                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
511                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
512                         if (ret != 0) {
513                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
514                                 return -1;
515                         }
516                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
517                         if (ret != 0) {
518                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
519                                 return -1;
520                         }
521                 }
522         }
523
524         return 0;
525 }
526
527
528 /*
529   pull the remote database contents from one node into the recdb
530  */
531 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
532                                     struct tdb_wrap *recdb, uint32_t dbid)
533 {
534         int ret;
535         TDB_DATA outdata;
536         struct ctdb_marshall_buffer *reply;
537         struct ctdb_rec_data *rec;
538         int i;
539         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
540
541         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
542                                CONTROL_TIMEOUT(), &outdata);
543         if (ret != 0) {
544                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
545                 talloc_free(tmp_ctx);
546                 return -1;
547         }
548
549         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
550
551         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
552                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
553                 talloc_free(tmp_ctx);
554                 return -1;
555         }
556         
557         rec = (struct ctdb_rec_data *)&reply->data[0];
558         
559         for (i=0;
560              i<reply->count;
561              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
562                 TDB_DATA key, data;
563                 struct ctdb_ltdb_header *hdr;
564                 TDB_DATA existing;
565                 
566                 key.dptr = &rec->data[0];
567                 key.dsize = rec->keylen;
568                 data.dptr = &rec->data[key.dsize];
569                 data.dsize = rec->datalen;
570                 
571                 hdr = (struct ctdb_ltdb_header *)data.dptr;
572
573                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
574                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
575                         talloc_free(tmp_ctx);
576                         return -1;
577                 }
578
579                 /* fetch the existing record, if any */
580                 existing = tdb_fetch(recdb->tdb, key);
581                 
582                 if (existing.dptr != NULL) {
583                         struct ctdb_ltdb_header header;
584                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
585                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
586                                          (unsigned)existing.dsize, srcnode));
587                                 free(existing.dptr);
588                                 talloc_free(tmp_ctx);
589                                 return -1;
590                         }
591                         header = *(struct ctdb_ltdb_header *)existing.dptr;
592                         free(existing.dptr);
593                         if (!(header.rsn < hdr->rsn ||
594                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
595                                 continue;
596                         }
597                 }
598                 
599                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
600                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
601                         talloc_free(tmp_ctx);
602                         return -1;                              
603                 }
604         }
605
606         talloc_free(tmp_ctx);
607
608         return 0;
609 }
610
611
612 struct pull_seqnum_cbdata {
613         int failed;
614         uint32_t pnn;
615         uint64_t seqnum;
616 };
617
618 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
619 {
620         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
621         uint64_t seqnum;
622
623         if (cb_data->failed != 0) {
624                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
625                 return;
626         }
627
628         if (res != 0) {
629                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
630                 cb_data->failed = 1;
631                 return;
632         }
633
634         if (outdata.dsize != sizeof(uint64_t)) {
635                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
636                 cb_data->failed = -1;
637                 return;
638         }
639
640         seqnum = *((uint64_t *)outdata.dptr);
641
642         if (seqnum > cb_data->seqnum) {
643                 cb_data->seqnum = seqnum;
644                 cb_data->pnn = node_pnn;
645         }
646 }
647
648 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
649 {
650         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
651
652         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
653         cb_data->failed = 1;
654 }
655
656 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
657                                 struct ctdb_recoverd *rec, 
658                                 struct ctdb_node_map *nodemap, 
659                                 struct tdb_wrap *recdb, uint32_t dbid)
660 {
661         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
662         uint32_t *nodes;
663         TDB_DATA data;
664         uint32_t outdata[2];
665         struct pull_seqnum_cbdata *cb_data;
666
667         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
668
669         outdata[0] = dbid;
670         outdata[1] = 0;
671
672         data.dsize = sizeof(outdata);
673         data.dptr  = (uint8_t *)&outdata[0];
674
675         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
676         if (cb_data == NULL) {
677                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
678                 talloc_free(tmp_ctx);
679                 return -1;
680         }
681
682         cb_data->failed = 0;
683         cb_data->pnn    = -1;
684         cb_data->seqnum = 0;
685         
686         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
687         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
688                                         nodes, 0,
689                                         CONTROL_TIMEOUT(), false, data,
690                                         pull_seqnum_cb,
691                                         pull_seqnum_fail_cb,
692                                         cb_data) != 0) {
693                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
694
695                 talloc_free(tmp_ctx);
696                 return -1;
697         }
698
699         if (cb_data->failed != 0) {
700                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
701                 talloc_free(tmp_ctx);
702                 return -1;
703         }
704
705         if (cb_data->seqnum == 0 || cb_data->pnn == -1) {
706                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
707                 talloc_free(tmp_ctx);
708                 return -1;
709         }
710
711         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
712
713         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
714                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
715                 talloc_free(tmp_ctx);
716                 return -1;
717         }
718
719         talloc_free(tmp_ctx);
720         return 0;
721 }
722
723
724 /*
725   pull all the remote database contents into the recdb
726  */
727 static int pull_remote_database(struct ctdb_context *ctdb,
728                                 struct ctdb_recoverd *rec, 
729                                 struct ctdb_node_map *nodemap, 
730                                 struct tdb_wrap *recdb, uint32_t dbid,
731                                 bool persistent)
732 {
733         int j;
734
735         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
736                 int ret;
737                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
738                 if (ret == 0) {
739                         return 0;
740                 }
741         }
742
743         /* pull all records from all other nodes across onto this node
744            (this merges based on rsn)
745         */
746         for (j=0; j<nodemap->num; j++) {
747                 /* dont merge from nodes that are unavailable */
748                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
749                         continue;
750                 }
751                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
752                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
753                                  nodemap->nodes[j].pnn));
754                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
755                         return -1;
756                 }
757         }
758         
759         return 0;
760 }
761
762
763 /*
764   update flags on all active nodes
765  */
766 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
767 {
768         int ret;
769
770         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
771                 if (ret != 0) {
772                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
773                 return -1;
774         }
775
776         return 0;
777 }
778
779 /*
780   ensure all nodes have the same vnnmap we do
781  */
782 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
783                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
784 {
785         int j, ret;
786
787         /* push the new vnn map out to all the nodes */
788         for (j=0; j<nodemap->num; j++) {
789                 /* dont push to nodes that are unavailable */
790                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
791                         continue;
792                 }
793
794                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
795                 if (ret != 0) {
796                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
797                         return -1;
798                 }
799         }
800
801         return 0;
802 }
803
804
805 struct vacuum_info {
806         struct vacuum_info *next, *prev;
807         struct ctdb_recoverd *rec;
808         uint32_t srcnode;
809         struct ctdb_db_context *ctdb_db;
810         struct ctdb_marshall_buffer *recs;
811         struct ctdb_rec_data *r;
812 };
813
814 static void vacuum_fetch_next(struct vacuum_info *v);
815
816 /*
817   called when a vacuum fetch has completed - just free it and do the next one
818  */
819 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
820 {
821         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
822         talloc_free(state);
823         vacuum_fetch_next(v);
824 }
825
826
827 /*
828   process the next element from the vacuum list
829 */
830 static void vacuum_fetch_next(struct vacuum_info *v)
831 {
832         struct ctdb_call call;
833         struct ctdb_rec_data *r;
834
835         while (v->recs->count) {
836                 struct ctdb_client_call_state *state;
837                 TDB_DATA data;
838                 struct ctdb_ltdb_header *hdr;
839
840                 ZERO_STRUCT(call);
841                 call.call_id = CTDB_NULL_FUNC;
842                 call.flags = CTDB_IMMEDIATE_MIGRATION;
843                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
844
845                 r = v->r;
846                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
847                 v->recs->count--;
848
849                 call.key.dptr = &r->data[0];
850                 call.key.dsize = r->keylen;
851
852                 /* ensure we don't block this daemon - just skip a record if we can't get
853                    the chainlock */
854                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
855                         continue;
856                 }
857
858                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
859                 if (data.dptr == NULL) {
860                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
861                         continue;
862                 }
863
864                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
865                         free(data.dptr);
866                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
867                         continue;
868                 }
869                 
870                 hdr = (struct ctdb_ltdb_header *)data.dptr;
871                 if (hdr->dmaster == v->rec->ctdb->pnn) {
872                         /* its already local */
873                         free(data.dptr);
874                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
875                         continue;
876                 }
877
878                 free(data.dptr);
879
880                 state = ctdb_call_send(v->ctdb_db, &call);
881                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
882                 if (state == NULL) {
883                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
884                         talloc_free(v);
885                         return;
886                 }
887                 state->async.fn = vacuum_fetch_callback;
888                 state->async.private_data = v;
889                 return;
890         }
891
892         talloc_free(v);
893 }
894
895
896 /*
897   destroy a vacuum info structure
898  */
899 static int vacuum_info_destructor(struct vacuum_info *v)
900 {
901         DLIST_REMOVE(v->rec->vacuum_info, v);
902         return 0;
903 }
904
905
906 /*
907   handler for vacuum fetch
908 */
909 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
910                                  TDB_DATA data, void *private_data)
911 {
912         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
913         struct ctdb_marshall_buffer *recs;
914         int ret, i;
915         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
916         const char *name;
917         struct ctdb_dbid_map *dbmap=NULL;
918         bool persistent = false;
919         struct ctdb_db_context *ctdb_db;
920         struct ctdb_rec_data *r;
921         uint32_t srcnode;
922         struct vacuum_info *v;
923
924         recs = (struct ctdb_marshall_buffer *)data.dptr;
925         r = (struct ctdb_rec_data *)&recs->data[0];
926
927         if (recs->count == 0) {
928                 talloc_free(tmp_ctx);
929                 return;
930         }
931
932         srcnode = r->reqid;
933
934         for (v=rec->vacuum_info;v;v=v->next) {
935                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
936                         /* we're already working on records from this node */
937                         talloc_free(tmp_ctx);
938                         return;
939                 }
940         }
941
942         /* work out if the database is persistent */
943         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
944         if (ret != 0) {
945                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
946                 talloc_free(tmp_ctx);
947                 return;
948         }
949
950         for (i=0;i<dbmap->num;i++) {
951                 if (dbmap->dbs[i].dbid == recs->db_id) {
952                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
953                         break;
954                 }
955         }
956         if (i == dbmap->num) {
957                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
958                 talloc_free(tmp_ctx);
959                 return;         
960         }
961
962         /* find the name of this database */
963         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
964                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
965                 talloc_free(tmp_ctx);
966                 return;
967         }
968
969         /* attach to it */
970         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
971         if (ctdb_db == NULL) {
972                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
973                 talloc_free(tmp_ctx);
974                 return;
975         }
976
977         v = talloc_zero(rec, struct vacuum_info);
978         if (v == NULL) {
979                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
980                 talloc_free(tmp_ctx);
981                 return;
982         }
983
984         v->rec = rec;
985         v->srcnode = srcnode;
986         v->ctdb_db = ctdb_db;
987         v->recs = talloc_memdup(v, recs, data.dsize);
988         if (v->recs == NULL) {
989                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
990                 talloc_free(v);
991                 talloc_free(tmp_ctx);
992                 return;         
993         }
994         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
995
996         DLIST_ADD(rec->vacuum_info, v);
997
998         talloc_set_destructor(v, vacuum_info_destructor);
999
1000         vacuum_fetch_next(v);
1001         talloc_free(tmp_ctx);
1002 }
1003
1004
1005 /*
1006   called when ctdb_wait_timeout should finish
1007  */
1008 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1009                               struct timeval yt, void *p)
1010 {
1011         uint32_t *timed_out = (uint32_t *)p;
1012         (*timed_out) = 1;
1013 }
1014
1015 /*
1016   wait for a given number of seconds
1017  */
1018 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1019 {
1020         uint32_t timed_out = 0;
1021         time_t usecs = (secs - (time_t)secs) * 1000000;
1022         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1023         while (!timed_out) {
1024                 event_loop_once(ctdb->ev);
1025         }
1026 }
1027
1028 /*
1029   called when an election times out (ends)
1030  */
1031 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1032                                   struct timeval t, void *p)
1033 {
1034         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1035         rec->election_timeout = NULL;
1036         fast_start = false;
1037
1038         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1039 }
1040
1041
1042 /*
1043   wait for an election to finish. It finished election_timeout seconds after
1044   the last election packet is received
1045  */
1046 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1047 {
1048         struct ctdb_context *ctdb = rec->ctdb;
1049         while (rec->election_timeout) {
1050                 event_loop_once(ctdb->ev);
1051         }
1052 }
1053
1054 /*
1055   Update our local flags from all remote connected nodes. 
1056   This is only run when we are or we belive we are the recovery master
1057  */
1058 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1059 {
1060         int j;
1061         struct ctdb_context *ctdb = rec->ctdb;
1062         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1063
1064         /* get the nodemap for all active remote nodes and verify
1065            they are the same as for this node
1066          */
1067         for (j=0; j<nodemap->num; j++) {
1068                 struct ctdb_node_map *remote_nodemap=NULL;
1069                 int ret;
1070
1071                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1072                         continue;
1073                 }
1074                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1075                         continue;
1076                 }
1077
1078                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1079                                            mem_ctx, &remote_nodemap);
1080                 if (ret != 0) {
1081                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1082                                   nodemap->nodes[j].pnn));
1083                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1084                         talloc_free(mem_ctx);
1085                         return MONITOR_FAILED;
1086                 }
1087                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1088                         /* We should tell our daemon about this so it
1089                            updates its flags or else we will log the same 
1090                            message again in the next iteration of recovery.
1091                            Since we are the recovery master we can just as
1092                            well update the flags on all nodes.
1093                         */
1094                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
1095                         if (ret != 0) {
1096                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1097                                 return -1;
1098                         }
1099
1100                         /* Update our local copy of the flags in the recovery
1101                            daemon.
1102                         */
1103                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1104                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1105                                  nodemap->nodes[j].flags));
1106                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1107                 }
1108                 talloc_free(remote_nodemap);
1109         }
1110         talloc_free(mem_ctx);
1111         return MONITOR_OK;
1112 }
1113
1114
1115 /* Create a new random generation ip. 
1116    The generation id can not be the INVALID_GENERATION id
1117 */
1118 static uint32_t new_generation(void)
1119 {
1120         uint32_t generation;
1121
1122         while (1) {
1123                 generation = random();
1124
1125                 if (generation != INVALID_GENERATION) {
1126                         break;
1127                 }
1128         }
1129
1130         return generation;
1131 }
1132
1133
1134 /*
1135   create a temporary working database
1136  */
1137 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1138 {
1139         char *name;
1140         struct tdb_wrap *recdb;
1141         unsigned tdb_flags;
1142
1143         /* open up the temporary recovery database */
1144         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1145                                ctdb->db_directory_state,
1146                                ctdb->pnn);
1147         if (name == NULL) {
1148                 return NULL;
1149         }
1150         unlink(name);
1151
1152         tdb_flags = TDB_NOLOCK;
1153         if (ctdb->valgrinding) {
1154                 tdb_flags |= TDB_NOMMAP;
1155         }
1156         tdb_flags |= TDB_DISALLOW_NESTING;
1157
1158         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1159                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1160         if (recdb == NULL) {
1161                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1162         }
1163
1164         talloc_free(name);
1165
1166         return recdb;
1167 }
1168
1169
1170 /* 
1171    a traverse function for pulling all relevent records from recdb
1172  */
1173 struct recdb_data {
1174         struct ctdb_context *ctdb;
1175         struct ctdb_marshall_buffer *recdata;
1176         uint32_t len;
1177         bool failed;
1178         bool persistent;
1179 };
1180
1181 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1182 {
1183         struct recdb_data *params = (struct recdb_data *)p;
1184         struct ctdb_rec_data *rec;
1185         struct ctdb_ltdb_header *hdr;
1186
1187         /* skip empty records */
1188         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1189                 return 0;
1190         }
1191
1192         /* update the dmaster field to point to us */
1193         hdr = (struct ctdb_ltdb_header *)data.dptr;
1194         if (!params->persistent) {
1195                 hdr->dmaster = params->ctdb->pnn;
1196                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1197         }
1198
1199         /* add the record to the blob ready to send to the nodes */
1200         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1201         if (rec == NULL) {
1202                 params->failed = true;
1203                 return -1;
1204         }
1205         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1206         if (params->recdata == NULL) {
1207                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1208                          rec->length + params->len, params->recdata->count));
1209                 params->failed = true;
1210                 return -1;
1211         }
1212         params->recdata->count++;
1213         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1214         params->len += rec->length;
1215         talloc_free(rec);
1216
1217         return 0;
1218 }
1219
1220 /*
1221   push the recdb database out to all nodes
1222  */
1223 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1224                                bool persistent,
1225                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1226 {
1227         struct recdb_data params;
1228         struct ctdb_marshall_buffer *recdata;
1229         TDB_DATA outdata;
1230         TALLOC_CTX *tmp_ctx;
1231         uint32_t *nodes;
1232
1233         tmp_ctx = talloc_new(ctdb);
1234         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1235
1236         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1237         CTDB_NO_MEMORY(ctdb, recdata);
1238
1239         recdata->db_id = dbid;
1240
1241         params.ctdb = ctdb;
1242         params.recdata = recdata;
1243         params.len = offsetof(struct ctdb_marshall_buffer, data);
1244         params.failed = false;
1245         params.persistent = persistent;
1246
1247         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1248                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1249                 talloc_free(params.recdata);
1250                 talloc_free(tmp_ctx);
1251                 return -1;
1252         }
1253
1254         if (params.failed) {
1255                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1256                 talloc_free(params.recdata);
1257                 talloc_free(tmp_ctx);
1258                 return -1;              
1259         }
1260
1261         recdata = params.recdata;
1262
1263         outdata.dptr = (void *)recdata;
1264         outdata.dsize = params.len;
1265
1266         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1267         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1268                                         nodes, 0,
1269                                         CONTROL_TIMEOUT(), false, outdata,
1270                                         NULL, NULL,
1271                                         NULL) != 0) {
1272                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1273                 talloc_free(recdata);
1274                 talloc_free(tmp_ctx);
1275                 return -1;
1276         }
1277
1278         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1279                   dbid, recdata->count));
1280
1281         talloc_free(recdata);
1282         talloc_free(tmp_ctx);
1283
1284         return 0;
1285 }
1286
1287
1288 /*
1289   go through a full recovery on one database 
1290  */
1291 static int recover_database(struct ctdb_recoverd *rec, 
1292                             TALLOC_CTX *mem_ctx,
1293                             uint32_t dbid,
1294                             bool persistent,
1295                             uint32_t pnn, 
1296                             struct ctdb_node_map *nodemap,
1297                             uint32_t transaction_id)
1298 {
1299         struct tdb_wrap *recdb;
1300         int ret;
1301         struct ctdb_context *ctdb = rec->ctdb;
1302         TDB_DATA data;
1303         struct ctdb_control_wipe_database w;
1304         uint32_t *nodes;
1305
1306         recdb = create_recdb(ctdb, mem_ctx);
1307         if (recdb == NULL) {
1308                 return -1;
1309         }
1310
1311         /* pull all remote databases onto the recdb */
1312         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1313         if (ret != 0) {
1314                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1315                 return -1;
1316         }
1317
1318         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1319
1320         /* wipe all the remote databases. This is safe as we are in a transaction */
1321         w.db_id = dbid;
1322         w.transaction_id = transaction_id;
1323
1324         data.dptr = (void *)&w;
1325         data.dsize = sizeof(w);
1326
1327         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1328         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1329                                         nodes, 0,
1330                                         CONTROL_TIMEOUT(), false, data,
1331                                         NULL, NULL,
1332                                         NULL) != 0) {
1333                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1334                 talloc_free(recdb);
1335                 return -1;
1336         }
1337         
1338         /* push out the correct database. This sets the dmaster and skips 
1339            the empty records */
1340         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1341         if (ret != 0) {
1342                 talloc_free(recdb);
1343                 return -1;
1344         }
1345
1346         /* all done with this database */
1347         talloc_free(recdb);
1348
1349         return 0;
1350 }
1351
1352 /*
1353   reload the nodes file 
1354 */
1355 static void reload_nodes_file(struct ctdb_context *ctdb)
1356 {
1357         ctdb->nodes = NULL;
1358         ctdb_load_nodes_file(ctdb);
1359 }
1360
1361 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1362                                          struct ctdb_recoverd *rec,
1363                                          struct ctdb_node_map *nodemap,
1364                                          uint32_t *culprit)
1365 {
1366         int j;
1367         int ret;
1368
1369         if (ctdb->num_nodes != nodemap->num) {
1370                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1371                                   ctdb->num_nodes, nodemap->num));
1372                 if (culprit) {
1373                         *culprit = ctdb->pnn;
1374                 }
1375                 return -1;
1376         }
1377
1378         for (j=0; j<nodemap->num; j++) {
1379                 /* release any existing data */
1380                 if (ctdb->nodes[j]->known_public_ips) {
1381                         talloc_free(ctdb->nodes[j]->known_public_ips);
1382                         ctdb->nodes[j]->known_public_ips = NULL;
1383                 }
1384                 if (ctdb->nodes[j]->available_public_ips) {
1385                         talloc_free(ctdb->nodes[j]->available_public_ips);
1386                         ctdb->nodes[j]->available_public_ips = NULL;
1387                 }
1388
1389                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1390                         continue;
1391                 }
1392
1393                 /* grab a new shiny list of public ips from the node */
1394                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1395                                         CONTROL_TIMEOUT(),
1396                                         ctdb->nodes[j]->pnn,
1397                                         ctdb->nodes,
1398                                         0,
1399                                         &ctdb->nodes[j]->known_public_ips);
1400                 if (ret != 0) {
1401                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
1402                                 ctdb->nodes[j]->pnn));
1403                         if (culprit) {
1404                                 *culprit = ctdb->nodes[j]->pnn;
1405                         }
1406                         return -1;
1407                 }
1408
1409                 if (ctdb->do_checkpublicip) {
1410                         if (rec->ip_check_disable_ctx == NULL) {
1411                                 if (verify_remote_ip_allocation(ctdb, ctdb->nodes[j]->known_public_ips)) {
1412                                         DEBUG(DEBUG_ERR,("Node %d has inconsistent public ip allocation and needs update.\n", ctdb->nodes[j]->pnn));
1413                                         rec->need_takeover_run = true;
1414                                 }
1415                         }
1416                 }
1417
1418                 /* grab a new shiny list of public ips from the node */
1419                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1420                                         CONTROL_TIMEOUT(),
1421                                         ctdb->nodes[j]->pnn,
1422                                         ctdb->nodes,
1423                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1424                                         &ctdb->nodes[j]->available_public_ips);
1425                 if (ret != 0) {
1426                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
1427                                 ctdb->nodes[j]->pnn));
1428                         if (culprit) {
1429                                 *culprit = ctdb->nodes[j]->pnn;
1430                         }
1431                         return -1;
1432                 }
1433         }
1434
1435         return 0;
1436 }
1437
1438 /* when we start a recovery, make sure all nodes use the same reclock file
1439    setting
1440 */
1441 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1442 {
1443         struct ctdb_context *ctdb = rec->ctdb;
1444         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1445         TDB_DATA data;
1446         uint32_t *nodes;
1447
1448         if (ctdb->recovery_lock_file == NULL) {
1449                 data.dptr  = NULL;
1450                 data.dsize = 0;
1451         } else {
1452                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1453                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1454         }
1455
1456         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1457         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1458                                         nodes, 0,
1459                                         CONTROL_TIMEOUT(),
1460                                         false, data,
1461                                         NULL, NULL,
1462                                         rec) != 0) {
1463                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1464                 talloc_free(tmp_ctx);
1465                 return -1;
1466         }
1467
1468         talloc_free(tmp_ctx);
1469         return 0;
1470 }
1471
1472
1473 /*
1474   we are the recmaster, and recovery is needed - start a recovery run
1475  */
1476 static int do_recovery(struct ctdb_recoverd *rec, 
1477                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1478                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1479 {
1480         struct ctdb_context *ctdb = rec->ctdb;
1481         int i, j, ret;
1482         uint32_t generation;
1483         struct ctdb_dbid_map *dbmap;
1484         TDB_DATA data;
1485         uint32_t *nodes;
1486         struct timeval start_time;
1487         uint32_t culprit = (uint32_t)-1;
1488
1489         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1490
1491         /* if recovery fails, force it again */
1492         rec->need_recovery = true;
1493
1494         for (i=0; i<ctdb->num_nodes; i++) {
1495                 struct ctdb_banning_state *ban_state;
1496
1497                 if (ctdb->nodes[i]->ban_state == NULL) {
1498                         continue;
1499                 }
1500                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1501                 if (ban_state->count < 2*ctdb->num_nodes) {
1502                         continue;
1503                 }
1504                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1505                         ctdb->nodes[i]->pnn, ban_state->count,
1506                         ctdb->tunable.recovery_ban_period));
1507                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1508                 ban_state->count = 0;
1509         }
1510
1511
1512         if (ctdb->tunable.verify_recovery_lock != 0) {
1513                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1514                 start_time = timeval_current();
1515                 if (!ctdb_recovery_lock(ctdb, true)) {
1516                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1517                                          "and ban ourself for %u seconds\n",
1518                                          ctdb->tunable.recovery_ban_period));
1519                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1520                         return -1;
1521                 }
1522                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1523                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1524         }
1525
1526         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1527
1528         /* get a list of all databases */
1529         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1530         if (ret != 0) {
1531                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1532                 return -1;
1533         }
1534
1535         /* we do the db creation before we set the recovery mode, so the freeze happens
1536            on all databases we will be dealing with. */
1537
1538         /* verify that we have all the databases any other node has */
1539         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1540         if (ret != 0) {
1541                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1542                 return -1;
1543         }
1544
1545         /* verify that all other nodes have all our databases */
1546         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1547         if (ret != 0) {
1548                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1549                 return -1;
1550         }
1551         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1552
1553         /* update the database priority for all remote databases */
1554         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1555         if (ret != 0) {
1556                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1557         }
1558         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1559
1560
1561         /* update all other nodes to use the same setting for reclock files
1562            as the local recovery master.
1563         */
1564         sync_recovery_lock_file_across_cluster(rec);
1565
1566         /* set recovery mode to active on all nodes */
1567         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1568         if (ret != 0) {
1569                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1570                 return -1;
1571         }
1572
1573         /* execute the "startrecovery" event script on all nodes */
1574         ret = run_startrecovery_eventscript(rec, nodemap);
1575         if (ret!=0) {
1576                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1577                 return -1;
1578         }
1579
1580         /*
1581           update all nodes to have the same flags that we have
1582          */
1583         for (i=0;i<nodemap->num;i++) {
1584                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1585                         continue;
1586                 }
1587
1588                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1589                 if (ret != 0) {
1590                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1591                         return -1;
1592                 }
1593         }
1594
1595         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1596
1597         /* pick a new generation number */
1598         generation = new_generation();
1599
1600         /* change the vnnmap on this node to use the new generation 
1601            number but not on any other nodes.
1602            this guarantees that if we abort the recovery prematurely
1603            for some reason (a node stops responding?)
1604            that we can just return immediately and we will reenter
1605            recovery shortly again.
1606            I.e. we deliberately leave the cluster with an inconsistent
1607            generation id to allow us to abort recovery at any stage and
1608            just restart it from scratch.
1609          */
1610         vnnmap->generation = generation;
1611         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1612         if (ret != 0) {
1613                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1614                 return -1;
1615         }
1616
1617         data.dptr = (void *)&generation;
1618         data.dsize = sizeof(uint32_t);
1619
1620         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1621         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1622                                         nodes, 0,
1623                                         CONTROL_TIMEOUT(), false, data,
1624                                         NULL,
1625                                         transaction_start_fail_callback,
1626                                         rec) != 0) {
1627                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1628                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1629                                         nodes, 0,
1630                                         CONTROL_TIMEOUT(), false, tdb_null,
1631                                         NULL,
1632                                         NULL,
1633                                         NULL) != 0) {
1634                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1635                 }
1636                 return -1;
1637         }
1638
1639         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1640
1641         for (i=0;i<dbmap->num;i++) {
1642                 ret = recover_database(rec, mem_ctx,
1643                                        dbmap->dbs[i].dbid,
1644                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1645                                        pnn, nodemap, generation);
1646                 if (ret != 0) {
1647                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1648                         return -1;
1649                 }
1650         }
1651
1652         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1653
1654         /* commit all the changes */
1655         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1656                                         nodes, 0,
1657                                         CONTROL_TIMEOUT(), false, data,
1658                                         NULL, NULL,
1659                                         NULL) != 0) {
1660                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1661                 return -1;
1662         }
1663
1664         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1665         
1666
1667         /* update the capabilities for all nodes */
1668         ret = update_capabilities(ctdb, nodemap);
1669         if (ret!=0) {
1670                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1671                 return -1;
1672         }
1673
1674         /* build a new vnn map with all the currently active and
1675            unbanned nodes */
1676         generation = new_generation();
1677         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1678         CTDB_NO_MEMORY(ctdb, vnnmap);
1679         vnnmap->generation = generation;
1680         vnnmap->size = 0;
1681         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1682         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1683         for (i=j=0;i<nodemap->num;i++) {
1684                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1685                         continue;
1686                 }
1687                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1688                         /* this node can not be an lmaster */
1689                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1690                         continue;
1691                 }
1692
1693                 vnnmap->size++;
1694                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1695                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1696                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1697
1698         }
1699         if (vnnmap->size == 0) {
1700                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1701                 vnnmap->size++;
1702                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1703                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1704                 vnnmap->map[0] = pnn;
1705         }       
1706
1707         /* update to the new vnnmap on all nodes */
1708         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1709         if (ret != 0) {
1710                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1711                 return -1;
1712         }
1713
1714         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1715
1716         /* update recmaster to point to us for all nodes */
1717         ret = set_recovery_master(ctdb, nodemap, pnn);
1718         if (ret!=0) {
1719                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1720                 return -1;
1721         }
1722
1723         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1724
1725         /*
1726           update all nodes to have the same flags that we have
1727          */
1728         for (i=0;i<nodemap->num;i++) {
1729                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1730                         continue;
1731                 }
1732
1733                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1734                 if (ret != 0) {
1735                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1736                         return -1;
1737                 }
1738         }
1739
1740         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1741
1742         /* disable recovery mode */
1743         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1744         if (ret != 0) {
1745                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1746                 return -1;
1747         }
1748
1749         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1750
1751         /*
1752           tell nodes to takeover their public IPs
1753          */
1754         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
1755         if (ret != 0) {
1756                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
1757                                  culprit));
1758                 rec->need_takeover_run = true;
1759                 return -1;
1760         }
1761         rec->need_takeover_run = false;
1762         ret = ctdb_takeover_run(ctdb, nodemap);
1763         if (ret != 0) {
1764                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
1765                 rec->need_takeover_run = true;
1766         }
1767
1768         /* execute the "recovered" event script on all nodes */
1769         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1770         if (ret!=0) {
1771                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1772                 return -1;
1773         }
1774
1775         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1776
1777         /* send a message to all clients telling them that the cluster 
1778            has been reconfigured */
1779         ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1780
1781         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1782
1783         rec->need_recovery = false;
1784
1785         /* we managed to complete a full recovery, make sure to forgive
1786            any past sins by the nodes that could now participate in the
1787            recovery.
1788         */
1789         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1790         for (i=0;i<nodemap->num;i++) {
1791                 struct ctdb_banning_state *ban_state;
1792
1793                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1794                         continue;
1795                 }
1796
1797                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1798                 if (ban_state == NULL) {
1799                         continue;
1800                 }
1801
1802                 ban_state->count = 0;
1803         }
1804
1805
1806         /* We just finished a recovery successfully. 
1807            We now wait for rerecovery_timeout before we allow 
1808            another recovery to take place.
1809         */
1810         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
1811         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1812         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
1813
1814         return 0;
1815 }
1816
1817
1818 /*
1819   elections are won by first checking the number of connected nodes, then
1820   the priority time, then the pnn
1821  */
1822 struct election_message {
1823         uint32_t num_connected;
1824         struct timeval priority_time;
1825         uint32_t pnn;
1826         uint32_t node_flags;
1827 };
1828
1829 /*
1830   form this nodes election data
1831  */
1832 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1833 {
1834         int ret, i;
1835         struct ctdb_node_map *nodemap;
1836         struct ctdb_context *ctdb = rec->ctdb;
1837
1838         ZERO_STRUCTP(em);
1839
1840         em->pnn = rec->ctdb->pnn;
1841         em->priority_time = rec->priority_time;
1842
1843         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1844         if (ret != 0) {
1845                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1846                 return;
1847         }
1848
1849         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1850         em->node_flags = rec->node_flags;
1851
1852         for (i=0;i<nodemap->num;i++) {
1853                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1854                         em->num_connected++;
1855                 }
1856         }
1857
1858         /* we shouldnt try to win this election if we cant be a recmaster */
1859         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1860                 em->num_connected = 0;
1861                 em->priority_time = timeval_current();
1862         }
1863
1864         talloc_free(nodemap);
1865 }
1866
1867 /*
1868   see if the given election data wins
1869  */
1870 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1871 {
1872         struct election_message myem;
1873         int cmp = 0;
1874
1875         ctdb_election_data(rec, &myem);
1876
1877         /* we cant win if we dont have the recmaster capability */
1878         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1879                 return false;
1880         }
1881
1882         /* we cant win if we are banned */
1883         if (rec->node_flags & NODE_FLAGS_BANNED) {
1884                 return false;
1885         }       
1886
1887         /* we cant win if we are stopped */
1888         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1889                 return false;
1890         }       
1891
1892         /* we will automatically win if the other node is banned */
1893         if (em->node_flags & NODE_FLAGS_BANNED) {
1894                 return true;
1895         }
1896
1897         /* we will automatically win if the other node is banned */
1898         if (em->node_flags & NODE_FLAGS_STOPPED) {
1899                 return true;
1900         }
1901
1902         /* try to use the most connected node */
1903         if (cmp == 0) {
1904                 cmp = (int)myem.num_connected - (int)em->num_connected;
1905         }
1906
1907         /* then the longest running node */
1908         if (cmp == 0) {
1909                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1910         }
1911
1912         if (cmp == 0) {
1913                 cmp = (int)myem.pnn - (int)em->pnn;
1914         }
1915
1916         return cmp > 0;
1917 }
1918
1919 /*
1920   send out an election request
1921  */
1922 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1923 {
1924         int ret;
1925         TDB_DATA election_data;
1926         struct election_message emsg;
1927         uint64_t srvid;
1928         struct ctdb_context *ctdb = rec->ctdb;
1929
1930         srvid = CTDB_SRVID_RECOVERY;
1931
1932         ctdb_election_data(rec, &emsg);
1933
1934         election_data.dsize = sizeof(struct election_message);
1935         election_data.dptr  = (unsigned char *)&emsg;
1936
1937
1938         /* send an election message to all active nodes */
1939         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1940         ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1941
1942
1943         /* A new node that is already frozen has entered the cluster.
1944            The existing nodes are not frozen and dont need to be frozen
1945            until the election has ended and we start the actual recovery
1946         */
1947         if (update_recmaster == true) {
1948                 /* first we assume we will win the election and set 
1949                    recoverymaster to be ourself on the current node
1950                  */
1951                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1952                 if (ret != 0) {
1953                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1954                         return -1;
1955                 }
1956         }
1957
1958
1959         return 0;
1960 }
1961
1962 /*
1963   this function will unban all nodes in the cluster
1964 */
1965 static void unban_all_nodes(struct ctdb_context *ctdb)
1966 {
1967         int ret, i;
1968         struct ctdb_node_map *nodemap;
1969         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1970         
1971         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1972         if (ret != 0) {
1973                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1974                 return;
1975         }
1976
1977         for (i=0;i<nodemap->num;i++) {
1978                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1979                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1980                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1981                 }
1982         }
1983
1984         talloc_free(tmp_ctx);
1985 }
1986
1987
1988 /*
1989   we think we are winning the election - send a broadcast election request
1990  */
1991 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1992 {
1993         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1994         int ret;
1995
1996         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1997         if (ret != 0) {
1998                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1999         }
2000
2001         talloc_free(rec->send_election_te);
2002         rec->send_election_te = NULL;
2003 }
2004
2005 /*
2006   handler for memory dumps
2007 */
2008 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2009                              TDB_DATA data, void *private_data)
2010 {
2011         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2012         TDB_DATA *dump;
2013         int ret;
2014         struct rd_memdump_reply *rd;
2015
2016         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2017                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2018                 talloc_free(tmp_ctx);
2019                 return;
2020         }
2021         rd = (struct rd_memdump_reply *)data.dptr;
2022
2023         dump = talloc_zero(tmp_ctx, TDB_DATA);
2024         if (dump == NULL) {
2025                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2026                 talloc_free(tmp_ctx);
2027                 return;
2028         }
2029         ret = ctdb_dump_memory(ctdb, dump);
2030         if (ret != 0) {
2031                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2032                 talloc_free(tmp_ctx);
2033                 return;
2034         }
2035
2036 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2037
2038         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2039         if (ret != 0) {
2040                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2041                 talloc_free(tmp_ctx);
2042                 return;
2043         }
2044
2045         talloc_free(tmp_ctx);
2046 }
2047
2048 /*
2049   handler for reload_nodes
2050 */
2051 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2052                              TDB_DATA data, void *private_data)
2053 {
2054         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2055
2056         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2057
2058         reload_nodes_file(rec->ctdb);
2059 }
2060
2061
2062 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
2063                               struct timeval yt, void *p)
2064 {
2065         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2066
2067         talloc_free(rec->ip_check_disable_ctx);
2068         rec->ip_check_disable_ctx = NULL;
2069 }
2070
2071
2072 static void ctdb_rebalance_timeout(struct event_context *ev, struct timed_event *te, 
2073                                   struct timeval t, void *p)
2074 {
2075         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2076         struct ctdb_context *ctdb = rec->ctdb;
2077         int ret;
2078
2079         DEBUG(DEBUG_NOTICE,("Rebalance all nodes that have had ip assignment changes.\n"));
2080
2081         ret = ctdb_takeover_run(ctdb, rec->nodemap);
2082         if (ret != 0) {
2083                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. ctdb_takeover_run() failed.\n"));
2084                 rec->need_takeover_run = true;
2085         }
2086
2087         talloc_free(rec->deferred_rebalance_ctx);
2088         rec->deferred_rebalance_ctx = NULL;
2089 }
2090
2091         
2092 static void recd_node_rebalance_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2093                              TDB_DATA data, void *private_data)
2094 {
2095         uint32_t pnn;
2096         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2097
2098         if (data.dsize != sizeof(uint32_t)) {
2099                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2100                 return;
2101         }
2102
2103         if (ctdb->tunable.deferred_rebalance_on_node_add == 0) {
2104                 return;
2105         }
2106
2107         pnn = *(uint32_t *)&data.dptr[0];
2108
2109         lcp2_forcerebalance(ctdb, pnn);
2110         DEBUG(DEBUG_NOTICE,("Received message to perform node rebalancing for node %d\n", pnn));
2111
2112         if (rec->deferred_rebalance_ctx != NULL) {
2113                 talloc_free(rec->deferred_rebalance_ctx);
2114         }
2115         rec->deferred_rebalance_ctx = talloc_new(rec);
2116         event_add_timed(ctdb->ev, rec->deferred_rebalance_ctx, 
2117                         timeval_current_ofs(ctdb->tunable.deferred_rebalance_on_node_add, 0),
2118                         ctdb_rebalance_timeout, rec);
2119 }
2120
2121
2122
2123 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2124                              TDB_DATA data, void *private_data)
2125 {
2126         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2127         struct ctdb_public_ip *ip;
2128
2129         if (rec->recmaster != rec->ctdb->pnn) {
2130                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2131                 return;
2132         }
2133
2134         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2135                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2136                 return;
2137         }
2138
2139         ip = (struct ctdb_public_ip *)data.dptr;
2140
2141         update_ip_assignment_tree(rec->ctdb, ip);
2142 }
2143
2144
2145 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2146                              TDB_DATA data, void *private_data)
2147 {
2148         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2149         uint32_t timeout;
2150
2151         if (rec->ip_check_disable_ctx != NULL) {
2152                 talloc_free(rec->ip_check_disable_ctx);
2153                 rec->ip_check_disable_ctx = NULL;
2154         }
2155
2156         if (data.dsize != sizeof(uint32_t)) {
2157                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2158                                  "expexting %lu\n", (long unsigned)data.dsize,
2159                                  (long unsigned)sizeof(uint32_t)));
2160                 return;
2161         }
2162         if (data.dptr == NULL) {
2163                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
2164                 return;
2165         }
2166
2167         timeout = *((uint32_t *)data.dptr);
2168         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
2169
2170         rec->ip_check_disable_ctx = talloc_new(rec);
2171         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
2172
2173         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
2174 }
2175
2176
2177 /*
2178   handler for ip reallocate, just add it to the list of callers and 
2179   handle this later in the monitor_cluster loop so we do not recurse
2180   with other callers to takeover_run()
2181 */
2182 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2183                              TDB_DATA data, void *private_data)
2184 {
2185         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2186         struct ip_reallocate_list *caller;
2187
2188         if (data.dsize != sizeof(struct rd_memdump_reply)) {
2189                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2190                 return;
2191         }
2192
2193         if (rec->ip_reallocate_ctx == NULL) {
2194                 rec->ip_reallocate_ctx = talloc_new(rec);
2195                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
2196         }
2197
2198         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
2199         CTDB_NO_MEMORY_FATAL(ctdb, caller);
2200
2201         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
2202         caller->next = rec->reallocate_callers;
2203         rec->reallocate_callers = caller;
2204
2205         return;
2206 }
2207
2208 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
2209 {
2210         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2211         TDB_DATA result;
2212         int32_t ret;
2213         struct ip_reallocate_list *callers;
2214         uint32_t culprit;
2215
2216         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2217
2218         /* update the list of public ips that a node can handle for
2219            all connected nodes
2220         */
2221         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2222         if (ret != 0) {
2223                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2224                                  culprit));
2225                 rec->need_takeover_run = true;
2226         }
2227         if (ret == 0) {
2228                 ret = ctdb_takeover_run(ctdb, rec->nodemap);
2229                 if (ret != 0) {
2230                         DEBUG(DEBUG_ERR,("Failed to reallocate addresses: ctdb_takeover_run() failed.\n"));
2231                         rec->need_takeover_run = true;
2232                 }
2233         }
2234
2235         result.dsize = sizeof(int32_t);
2236         result.dptr  = (uint8_t *)&ret;
2237
2238         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
2239
2240                 /* Someone that sent srvid==0 does not want a reply */
2241                 if (callers->rd->srvid == 0) {
2242                         continue;
2243                 }
2244                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
2245                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
2246                                   (unsigned long long)callers->rd->srvid));
2247                 ret = ctdb_client_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
2248                 if (ret != 0) {
2249                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
2250                                          "message to %u:%llu\n",
2251                                          (unsigned)callers->rd->pnn,
2252                                          (unsigned long long)callers->rd->srvid));
2253                 }
2254         }
2255
2256         talloc_free(tmp_ctx);
2257         talloc_free(rec->ip_reallocate_ctx);
2258         rec->ip_reallocate_ctx = NULL;
2259         rec->reallocate_callers = NULL;
2260         
2261 }
2262
2263
2264 /*
2265   handler for recovery master elections
2266 */
2267 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2268                              TDB_DATA data, void *private_data)
2269 {
2270         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2271         int ret;
2272         struct election_message *em = (struct election_message *)data.dptr;
2273         TALLOC_CTX *mem_ctx;
2274
2275         /* we got an election packet - update the timeout for the election */
2276         talloc_free(rec->election_timeout);
2277         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2278                                                 fast_start ?
2279                                                 timeval_current_ofs(0, 500000) :
2280                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2281                                                 ctdb_election_timeout, rec);
2282
2283         mem_ctx = talloc_new(ctdb);
2284
2285         /* someone called an election. check their election data
2286            and if we disagree and we would rather be the elected node, 
2287            send a new election message to all other nodes
2288          */
2289         if (ctdb_election_win(rec, em)) {
2290                 if (!rec->send_election_te) {
2291                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2292                                                                 timeval_current_ofs(0, 500000),
2293                                                                 election_send_request, rec);
2294                 }
2295                 talloc_free(mem_ctx);
2296                 /*unban_all_nodes(ctdb);*/
2297                 return;
2298         }
2299         
2300         /* we didn't win */
2301         talloc_free(rec->send_election_te);
2302         rec->send_election_te = NULL;
2303
2304         if (ctdb->tunable.verify_recovery_lock != 0) {
2305                 /* release the recmaster lock */
2306                 if (em->pnn != ctdb->pnn &&
2307                     ctdb->recovery_lock_fd != -1) {
2308                         close(ctdb->recovery_lock_fd);
2309                         ctdb->recovery_lock_fd = -1;
2310                         unban_all_nodes(ctdb);
2311                 }
2312         }
2313
2314         /* ok, let that guy become recmaster then */
2315         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2316         if (ret != 0) {
2317                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2318                 talloc_free(mem_ctx);
2319                 return;
2320         }
2321
2322         talloc_free(mem_ctx);
2323         return;
2324 }
2325
2326
2327 /*
2328   force the start of the election process
2329  */
2330 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2331                            struct ctdb_node_map *nodemap)
2332 {
2333         int ret;
2334         struct ctdb_context *ctdb = rec->ctdb;
2335
2336         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2337
2338         /* set all nodes to recovery mode to stop all internode traffic */
2339         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2340         if (ret != 0) {
2341                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2342                 return;
2343         }
2344
2345         talloc_free(rec->election_timeout);
2346         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2347                                                 fast_start ?
2348                                                 timeval_current_ofs(0, 500000) :
2349                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2350                                                 ctdb_election_timeout, rec);
2351
2352         ret = send_election_request(rec, pnn, true);
2353         if (ret!=0) {
2354                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2355                 return;
2356         }
2357
2358         /* wait for a few seconds to collect all responses */
2359         ctdb_wait_election(rec);
2360 }
2361
2362
2363
2364 /*
2365   handler for when a node changes its flags
2366 */
2367 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2368                             TDB_DATA data, void *private_data)
2369 {
2370         int ret;
2371         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2372         struct ctdb_node_map *nodemap=NULL;
2373         TALLOC_CTX *tmp_ctx;
2374         int i;
2375         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2376         int disabled_flag_changed;
2377
2378         if (data.dsize != sizeof(*c)) {
2379                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2380                 return;
2381         }
2382
2383         tmp_ctx = talloc_new(ctdb);
2384         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2385
2386         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2387         if (ret != 0) {
2388                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2389                 talloc_free(tmp_ctx);
2390                 return;         
2391         }
2392
2393
2394         for (i=0;i<nodemap->num;i++) {
2395                 if (nodemap->nodes[i].pnn == c->pnn) break;
2396         }
2397
2398         if (i == nodemap->num) {
2399                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2400                 talloc_free(tmp_ctx);
2401                 return;
2402         }
2403
2404         if (nodemap->nodes[i].flags != c->new_flags) {
2405                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2406         }
2407
2408         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2409
2410         nodemap->nodes[i].flags = c->new_flags;
2411
2412         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2413                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2414
2415         if (ret == 0) {
2416                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2417                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2418         }
2419         
2420         if (ret == 0 &&
2421             ctdb->recovery_master == ctdb->pnn &&
2422             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2423                 /* Only do the takeover run if the perm disabled or unhealthy
2424                    flags changed since these will cause an ip failover but not
2425                    a recovery.
2426                    If the node became disconnected or banned this will also
2427                    lead to an ip address failover but that is handled 
2428                    during recovery
2429                 */
2430                 if (disabled_flag_changed) {
2431                         rec->need_takeover_run = true;
2432                 }
2433         }
2434
2435         talloc_free(tmp_ctx);
2436 }
2437
2438 /*
2439   handler for when we need to push out flag changes ot all other nodes
2440 */
2441 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2442                             TDB_DATA data, void *private_data)
2443 {
2444         int ret;
2445         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2446         struct ctdb_node_map *nodemap=NULL;
2447         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2448         uint32_t recmaster;
2449         uint32_t *nodes;
2450
2451         /* find the recovery master */
2452         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2453         if (ret != 0) {
2454                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2455                 talloc_free(tmp_ctx);
2456                 return;
2457         }
2458
2459         /* read the node flags from the recmaster */
2460         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2461         if (ret != 0) {
2462                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2463                 talloc_free(tmp_ctx);
2464                 return;
2465         }
2466         if (c->pnn >= nodemap->num) {
2467                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2468                 talloc_free(tmp_ctx);
2469                 return;
2470         }
2471
2472         /* send the flags update to all connected nodes */
2473         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2474
2475         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2476                                       nodes, 0, CONTROL_TIMEOUT(),
2477                                       false, data,
2478                                       NULL, NULL,
2479                                       NULL) != 0) {
2480                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2481
2482                 talloc_free(tmp_ctx);
2483                 return;
2484         }
2485
2486         talloc_free(tmp_ctx);
2487 }
2488
2489
2490 struct verify_recmode_normal_data {
2491         uint32_t count;
2492         enum monitor_result status;
2493 };
2494
2495 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2496 {
2497         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2498
2499
2500         /* one more node has responded with recmode data*/
2501         rmdata->count--;
2502
2503         /* if we failed to get the recmode, then return an error and let
2504            the main loop try again.
2505         */
2506         if (state->state != CTDB_CONTROL_DONE) {
2507                 if (rmdata->status == MONITOR_OK) {
2508                         rmdata->status = MONITOR_FAILED;
2509                 }
2510                 return;
2511         }
2512
2513         /* if we got a response, then the recmode will be stored in the
2514            status field
2515         */
2516         if (state->status != CTDB_RECOVERY_NORMAL) {
2517                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2518                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2519         }
2520
2521         return;
2522 }
2523
2524
2525 /* verify that all nodes are in normal recovery mode */
2526 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2527 {
2528         struct verify_recmode_normal_data *rmdata;
2529         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2530         struct ctdb_client_control_state *state;
2531         enum monitor_result status;
2532         int j;
2533         
2534         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2535         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2536         rmdata->count  = 0;
2537         rmdata->status = MONITOR_OK;
2538
2539         /* loop over all active nodes and send an async getrecmode call to 
2540            them*/
2541         for (j=0; j<nodemap->num; j++) {
2542                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2543                         continue;
2544                 }
2545                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2546                                         CONTROL_TIMEOUT(), 
2547                                         nodemap->nodes[j].pnn);
2548                 if (state == NULL) {
2549                         /* we failed to send the control, treat this as 
2550                            an error and try again next iteration
2551                         */                      
2552                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2553                         talloc_free(mem_ctx);
2554                         return MONITOR_FAILED;
2555                 }
2556
2557                 /* set up the callback functions */
2558                 state->async.fn = verify_recmode_normal_callback;
2559                 state->async.private_data = rmdata;
2560
2561                 /* one more control to wait for to complete */
2562                 rmdata->count++;
2563         }
2564
2565
2566         /* now wait for up to the maximum number of seconds allowed
2567            or until all nodes we expect a response from has replied
2568         */
2569         while (rmdata->count > 0) {
2570                 event_loop_once(ctdb->ev);
2571         }
2572
2573         status = rmdata->status;
2574         talloc_free(mem_ctx);
2575         return status;
2576 }
2577
2578
2579 struct verify_recmaster_data {
2580         struct ctdb_recoverd *rec;
2581         uint32_t count;
2582         uint32_t pnn;
2583         enum monitor_result status;
2584 };
2585
2586 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2587 {
2588         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2589
2590
2591         /* one more node has responded with recmaster data*/
2592         rmdata->count--;
2593
2594         /* if we failed to get the recmaster, then return an error and let
2595            the main loop try again.
2596         */
2597         if (state->state != CTDB_CONTROL_DONE) {
2598                 if (rmdata->status == MONITOR_OK) {
2599                         rmdata->status = MONITOR_FAILED;
2600                 }
2601                 return;
2602         }
2603
2604         /* if we got a response, then the recmaster will be stored in the
2605            status field
2606         */
2607         if (state->status != rmdata->pnn) {
2608                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2609                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2610                 rmdata->status = MONITOR_ELECTION_NEEDED;
2611         }
2612
2613         return;
2614 }
2615
2616
2617 /* verify that all nodes agree that we are the recmaster */
2618 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2619 {
2620         struct ctdb_context *ctdb = rec->ctdb;
2621         struct verify_recmaster_data *rmdata;
2622         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2623         struct ctdb_client_control_state *state;
2624         enum monitor_result status;
2625         int j;
2626         
2627         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2628         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2629         rmdata->rec    = rec;
2630         rmdata->count  = 0;
2631         rmdata->pnn    = pnn;
2632         rmdata->status = MONITOR_OK;
2633
2634         /* loop over all active nodes and send an async getrecmaster call to 
2635            them*/
2636         for (j=0; j<nodemap->num; j++) {
2637                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2638                         continue;
2639                 }
2640                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2641                                         CONTROL_TIMEOUT(),
2642                                         nodemap->nodes[j].pnn);
2643                 if (state == NULL) {
2644                         /* we failed to send the control, treat this as 
2645                            an error and try again next iteration
2646                         */                      
2647                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2648                         talloc_free(mem_ctx);
2649                         return MONITOR_FAILED;
2650                 }
2651
2652                 /* set up the callback functions */
2653                 state->async.fn = verify_recmaster_callback;
2654                 state->async.private_data = rmdata;
2655
2656                 /* one more control to wait for to complete */
2657                 rmdata->count++;
2658         }
2659
2660
2661         /* now wait for up to the maximum number of seconds allowed
2662            or until all nodes we expect a response from has replied
2663         */
2664         while (rmdata->count > 0) {
2665                 event_loop_once(ctdb->ev);
2666         }
2667
2668         status = rmdata->status;
2669         talloc_free(mem_ctx);
2670         return status;
2671 }
2672
2673
2674 /* called to check that the local allocation of public ip addresses is ok.
2675 */
2676 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
2677 {
2678         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2679         struct ctdb_control_get_ifaces *ifaces = NULL;
2680         struct ctdb_all_public_ips *ips = NULL;
2681         struct ctdb_uptime *uptime1 = NULL;
2682         struct ctdb_uptime *uptime2 = NULL;
2683         int ret, j;
2684         bool need_iface_check = false;
2685         bool need_takeover_run = false;
2686
2687         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2688                                 CTDB_CURRENT_NODE, &uptime1);
2689         if (ret != 0) {
2690                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2691                 talloc_free(mem_ctx);
2692                 return -1;
2693         }
2694
2695
2696         /* read the interfaces from the local node */
2697         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2698         if (ret != 0) {
2699                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2700                 talloc_free(mem_ctx);
2701                 return -1;
2702         }
2703
2704         if (!rec->ifaces) {
2705                 need_iface_check = true;
2706         } else if (rec->ifaces->num != ifaces->num) {
2707                 need_iface_check = true;
2708         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2709                 need_iface_check = true;
2710         }
2711
2712         if (need_iface_check) {
2713                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2714                                      "local node %u - force takeover run\n",
2715                                      pnn));
2716                 need_takeover_run = true;
2717         }
2718
2719         /* read the ip allocation from the local node */
2720         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2721         if (ret != 0) {
2722                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2723                 talloc_free(mem_ctx);
2724                 return -1;
2725         }
2726
2727         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2728                                 CTDB_CURRENT_NODE, &uptime2);
2729         if (ret != 0) {
2730                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2731                 talloc_free(mem_ctx);
2732                 return -1;
2733         }
2734
2735         /* skip the check if the startrecovery time has changed */
2736         if (timeval_compare(&uptime1->last_recovery_started,
2737                             &uptime2->last_recovery_started) != 0) {
2738                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2739                 talloc_free(mem_ctx);
2740                 return 0;
2741         }
2742
2743         /* skip the check if the endrecovery time has changed */
2744         if (timeval_compare(&uptime1->last_recovery_finished,
2745                             &uptime2->last_recovery_finished) != 0) {
2746                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2747                 talloc_free(mem_ctx);
2748                 return 0;
2749         }
2750
2751         /* skip the check if we have started but not finished recovery */
2752         if (timeval_compare(&uptime1->last_recovery_finished,
2753                             &uptime1->last_recovery_started) != 1) {
2754                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2755                 talloc_free(mem_ctx);
2756
2757                 return 0;
2758         }
2759
2760         talloc_free(rec->ifaces);
2761         rec->ifaces = talloc_steal(rec, ifaces);
2762
2763         /* verify that we have the ip addresses we should have
2764            and we dont have ones we shouldnt have.
2765            if we find an inconsistency we set recmode to
2766            active on the local node and wait for the recmaster
2767            to do a full blown recovery.
2768            also if the pnn is -1 and we are healthy and can host the ip
2769            we also request a ip reallocation.
2770         */
2771         if (ctdb->tunable.disable_ip_failover == 0) {
2772                 for (j=0; j<ips->num; j++) {
2773                         if (ips->ips[j].pnn == -1 && nodemap->nodes[pnn].flags == 0) {
2774                                 DEBUG(DEBUG_CRIT,("Public address '%s' is not assigned and we could serve this ip\n",
2775                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2776                                 need_takeover_run = true;
2777                         } else if (ips->ips[j].pnn == pnn) {
2778                                 if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2779                                         DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2780                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2781                                         need_takeover_run = true;
2782                                 }
2783                         } else {
2784                                 if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2785                                         DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2786                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
2787                                         need_takeover_run = true;
2788                                 }
2789                         }
2790                 }
2791         }
2792
2793         if (need_takeover_run) {
2794                 struct takeover_run_reply rd;
2795                 TDB_DATA data;
2796
2797                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2798
2799                 rd.pnn = ctdb->pnn;
2800                 rd.srvid = 0;
2801                 data.dptr = (uint8_t *)&rd;
2802                 data.dsize = sizeof(rd);
2803
2804                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2805                 if (ret != 0) {
2806                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2807                 }
2808         }
2809         talloc_free(mem_ctx);
2810         return 0;
2811 }
2812
2813
2814 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2815 {
2816         struct ctdb_node_map **remote_nodemaps = callback_data;
2817
2818         if (node_pnn >= ctdb->num_nodes) {
2819                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2820                 return;
2821         }
2822
2823         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2824
2825 }
2826
2827 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2828         struct ctdb_node_map *nodemap,
2829         struct ctdb_node_map **remote_nodemaps)
2830 {
2831         uint32_t *nodes;
2832
2833         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2834         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2835                                         nodes, 0,
2836                                         CONTROL_TIMEOUT(), false, tdb_null,
2837                                         async_getnodemap_callback,
2838                                         NULL,
2839                                         remote_nodemaps) != 0) {
2840                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2841
2842                 return -1;
2843         }
2844
2845         return 0;
2846 }
2847
2848 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2849 struct ctdb_check_reclock_state {
2850         struct ctdb_context *ctdb;
2851         struct timeval start_time;
2852         int fd[2];
2853         pid_t child;
2854         struct timed_event *te;
2855         struct fd_event *fde;
2856         enum reclock_child_status status;
2857 };
2858
2859 /* when we free the reclock state we must kill any child process.
2860 */
2861 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2862 {
2863         struct ctdb_context *ctdb = state->ctdb;
2864
2865         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2866
2867         if (state->fd[0] != -1) {
2868                 close(state->fd[0]);
2869                 state->fd[0] = -1;
2870         }
2871         if (state->fd[1] != -1) {
2872                 close(state->fd[1]);
2873                 state->fd[1] = -1;
2874         }
2875         kill(state->child, SIGKILL);
2876         return 0;
2877 }
2878
2879 /*
2880   called if our check_reclock child times out. this would happen if
2881   i/o to the reclock file blocks.
2882  */
2883 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2884                                          struct timeval t, void *private_data)
2885 {
2886         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2887                                            struct ctdb_check_reclock_state);
2888
2889         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2890         state->status = RECLOCK_TIMEOUT;
2891 }
2892
2893 /* this is called when the child process has completed checking the reclock
2894    file and has written data back to us through the pipe.
2895 */
2896 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2897                              uint16_t flags, void *private_data)
2898 {
2899         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2900                                              struct ctdb_check_reclock_state);
2901         char c = 0;
2902         int ret;
2903
2904         /* we got a response from our child process so we can abort the
2905            timeout.
2906         */
2907         talloc_free(state->te);
2908         state->te = NULL;
2909
2910         ret = read(state->fd[0], &c, 1);
2911         if (ret != 1 || c != RECLOCK_OK) {
2912                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2913                 state->status = RECLOCK_FAILED;
2914
2915                 return;
2916         }
2917
2918         state->status = RECLOCK_OK;
2919         return;
2920 }
2921
2922 static int check_recovery_lock(struct ctdb_context *ctdb)
2923 {
2924         int ret;
2925         struct ctdb_check_reclock_state *state;
2926         pid_t parent = getpid();
2927
2928         if (ctdb->recovery_lock_fd == -1) {
2929                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2930                 return -1;
2931         }
2932
2933         state = talloc(ctdb, struct ctdb_check_reclock_state);
2934         CTDB_NO_MEMORY(ctdb, state);
2935
2936         state->ctdb = ctdb;
2937         state->start_time = timeval_current();
2938         state->status = RECLOCK_CHECKING;
2939         state->fd[0] = -1;
2940         state->fd[1] = -1;
2941
2942         ret = pipe(state->fd);
2943         if (ret != 0) {
2944                 talloc_free(state);
2945                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2946                 return -1;
2947         }
2948
2949         state->child = ctdb_fork(ctdb);
2950         if (state->child == (pid_t)-1) {
2951                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2952                 close(state->fd[0]);
2953                 state->fd[0] = -1;
2954                 close(state->fd[1]);
2955                 state->fd[1] = -1;
2956                 talloc_free(state);
2957                 return -1;
2958         }
2959
2960         if (state->child == 0) {
2961                 char cc = RECLOCK_OK;
2962                 close(state->fd[0]);
2963                 state->fd[0] = -1;
2964
2965                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
2966                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2967                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2968                         cc = RECLOCK_FAILED;
2969                 }
2970
2971                 write(state->fd[1], &cc, 1);
2972                 /* make sure we die when our parent dies */
2973                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2974                         sleep(5);
2975                         write(state->fd[1], &cc, 1);
2976                 }
2977                 _exit(0);
2978         }
2979         close(state->fd[1]);
2980         state->fd[1] = -1;
2981         set_close_on_exec(state->fd[0]);
2982
2983         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2984
2985         talloc_set_destructor(state, check_reclock_destructor);
2986
2987         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2988                                     ctdb_check_reclock_timeout, state);
2989         if (state->te == NULL) {
2990                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2991                 talloc_free(state);
2992                 return -1;
2993         }
2994
2995         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2996                                 EVENT_FD_READ,
2997                                 reclock_child_handler,
2998                                 (void *)state);
2999
3000         if (state->fde == NULL) {
3001                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3002                 talloc_free(state);
3003                 return -1;
3004         }
3005         tevent_fd_set_auto_close(state->fde);
3006
3007         while (state->status == RECLOCK_CHECKING) {
3008                 event_loop_once(ctdb->ev);
3009         }
3010
3011         if (state->status == RECLOCK_FAILED) {
3012                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3013                 close(ctdb->recovery_lock_fd);
3014                 ctdb->recovery_lock_fd = -1;
3015                 talloc_free(state);
3016                 return -1;
3017         }
3018
3019         talloc_free(state);
3020         return 0;
3021 }
3022
3023 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3024 {
3025         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3026         const char *reclockfile;
3027
3028         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3029                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3030                 talloc_free(tmp_ctx);
3031                 return -1;      
3032         }
3033
3034         if (reclockfile == NULL) {
3035                 if (ctdb->recovery_lock_file != NULL) {
3036                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3037                         talloc_free(ctdb->recovery_lock_file);
3038                         ctdb->recovery_lock_file = NULL;
3039                         if (ctdb->recovery_lock_fd != -1) {
3040                                 close(ctdb->recovery_lock_fd);
3041                                 ctdb->recovery_lock_fd = -1;
3042                         }
3043                 }
3044                 ctdb->tunable.verify_recovery_lock = 0;
3045                 talloc_free(tmp_ctx);
3046                 return 0;
3047         }
3048
3049         if (ctdb->recovery_lock_file == NULL) {
3050                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3051                 if (ctdb->recovery_lock_fd != -1) {
3052                         close(ctdb->recovery_lock_fd);
3053                         ctdb->recovery_lock_fd = -1;
3054                 }
3055                 talloc_free(tmp_ctx);
3056                 return 0;
3057         }
3058
3059
3060         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3061                 talloc_free(tmp_ctx);
3062                 return 0;
3063         }
3064
3065         talloc_free(ctdb->recovery_lock_file);
3066         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3067         ctdb->tunable.verify_recovery_lock = 0;
3068         if (ctdb->recovery_lock_fd != -1) {
3069                 close(ctdb->recovery_lock_fd);
3070                 ctdb->recovery_lock_fd = -1;
3071         }
3072
3073         talloc_free(tmp_ctx);
3074         return 0;
3075 }
3076
3077 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3078                       TALLOC_CTX *mem_ctx)
3079 {
3080         uint32_t pnn;
3081         struct ctdb_node_map *nodemap=NULL;
3082         struct ctdb_node_map *recmaster_nodemap=NULL;
3083         struct ctdb_node_map **remote_nodemaps=NULL;
3084         struct ctdb_vnn_map *vnnmap=NULL;
3085         struct ctdb_vnn_map *remote_vnnmap=NULL;
3086         int32_t debug_level;
3087         int i, j, ret;
3088
3089
3090
3091         /* verify that the main daemon is still running */
3092         if (kill(ctdb->ctdbd_pid, 0) != 0) {
3093                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3094                 exit(-1);
3095         }
3096
3097         /* ping the local daemon to tell it we are alive */
3098         ctdb_ctrl_recd_ping(ctdb);
3099
3100         if (rec->election_timeout) {
3101                 /* an election is in progress */
3102                 return;
3103         }
3104
3105         /* read the debug level from the parent and update locally */
3106         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3107         if (ret !=0) {
3108                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3109                 return;
3110         }
3111         LogLevel = debug_level;
3112
3113
3114         /* We must check if we need to ban a node here but we want to do this
3115            as early as possible so we dont wait until we have pulled the node
3116            map from the local node. thats why we have the hardcoded value 20
3117         */
3118         for (i=0; i<ctdb->num_nodes; i++) {
3119                 struct ctdb_banning_state *ban_state;
3120
3121                 if (ctdb->nodes[i]->ban_state == NULL) {
3122                         continue;
3123                 }
3124                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
3125                 if (ban_state->count < 20) {
3126                         continue;
3127                 }
3128                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
3129                         ctdb->nodes[i]->pnn, ban_state->count,
3130                         ctdb->tunable.recovery_ban_period));
3131                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
3132                 ban_state->count = 0;
3133         }
3134
3135         /* get relevant tunables */
3136         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3137         if (ret != 0) {
3138                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3139                 return;
3140         }
3141
3142         /* get the current recovery lock file from the server */
3143         if (update_recovery_lock_file(ctdb) != 0) {
3144                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3145                 return;
3146         }
3147
3148         /* Make sure that if recovery lock verification becomes disabled when
3149            we close the file
3150         */
3151         if (ctdb->tunable.verify_recovery_lock == 0) {
3152                 if (ctdb->recovery_lock_fd != -1) {
3153                         close(ctdb->recovery_lock_fd);
3154                         ctdb->recovery_lock_fd = -1;
3155                 }
3156         }
3157
3158         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3159         if (pnn == (uint32_t)-1) {
3160                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
3161                 return;
3162         }
3163
3164         /* get the vnnmap */
3165         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3166         if (ret != 0) {
3167                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3168                 return;
3169         }
3170
3171
3172         /* get number of nodes */
3173         if (rec->nodemap) {
3174                 talloc_free(rec->nodemap);
3175                 rec->nodemap = NULL;
3176                 nodemap=NULL;
3177         }
3178         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3179         if (ret != 0) {
3180                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3181                 return;
3182         }
3183         nodemap = rec->nodemap;
3184
3185         /* update the capabilities for all nodes */
3186         ret = update_capabilities(ctdb, nodemap);
3187         if (ret != 0) {
3188                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3189                 return;
3190         }
3191
3192         /* check which node is the recovery master */
3193         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3194         if (ret != 0) {
3195                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3196                 return;
3197         }
3198
3199         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
3200         if (rec->recmaster != pnn) {
3201                 if (rec->ip_reallocate_ctx != NULL) {
3202                         talloc_free(rec->ip_reallocate_ctx);
3203                         rec->ip_reallocate_ctx = NULL;
3204                         rec->reallocate_callers = NULL;
3205                 }
3206         }
3207
3208         if (rec->recmaster == (uint32_t)-1) {
3209                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3210                 force_election(rec, pnn, nodemap);
3211                 return;
3212         }
3213
3214         /* if the local daemon is STOPPED, we verify that the databases are
3215            also frozen and thet the recmode is set to active 
3216         */
3217         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
3218                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3219                 if (ret != 0) {
3220                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3221                 }
3222                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3223                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
3224
3225                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3226                         if (ret != 0) {
3227                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
3228                                 return;
3229                         }
3230                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3231                         if (ret != 0) {
3232                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
3233
3234                                 return;
3235                         }
3236                         return;
3237                 }
3238         }
3239         /* If the local node is stopped, verify we are not the recmaster 
3240            and yield this role if so
3241         */
3242         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
3243                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
3244                 force_election(rec, pnn, nodemap);
3245                 return;
3246         }
3247         
3248         /*
3249          * if the current recmaster do not have CTDB_CAP_RECMASTER,
3250          * but we have force an election and try to become the new
3251          * recmaster
3252          */
3253         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3254             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3255              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3256                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3257                                   " but we (node %u) have - force an election\n",
3258                                   rec->recmaster, pnn));
3259                 force_election(rec, pnn, nodemap);
3260                 return;
3261         }
3262
3263         /* check that we (recovery daemon) and the local ctdb daemon
3264            agrees on whether we are banned or not
3265         */
3266 //qqq
3267
3268         /* remember our own node flags */
3269         rec->node_flags = nodemap->nodes[pnn].flags;
3270
3271         /* count how many active nodes there are */
3272         rec->num_active    = 0;
3273         rec->num_connected = 0;
3274         for (i=0; i<nodemap->num; i++) {
3275                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3276                         rec->num_active++;
3277                 }
3278                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3279                         rec->num_connected++;
3280                 }
3281         }
3282
3283
3284         /* verify that the recmaster node is still active */
3285         for (j=0; j<nodemap->num; j++) {
3286                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3287                         break;
3288                 }
3289         }
3290
3291         if (j == nodemap->num) {
3292                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3293                 force_election(rec, pnn, nodemap);
3294                 return;
3295         }
3296
3297         /* if recovery master is disconnected we must elect a new recmaster */
3298         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3299                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3300                 force_election(rec, pnn, nodemap);
3301                 return;
3302         }
3303
3304         /* grap the nodemap from the recovery master to check if it is banned */
3305         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3306                                    mem_ctx, &recmaster_nodemap);
3307         if (ret != 0) {
3308                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3309                           nodemap->nodes[j].pnn));
3310                 return;
3311         }
3312
3313
3314         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3315                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3316                 force_election(rec, pnn, nodemap);
3317                 return;
3318         }
3319
3320
3321         /* verify that we have all ip addresses we should have and we dont
3322          * have addresses we shouldnt have.
3323          */ 
3324         if (ctdb->tunable.disable_ip_failover == 0) {
3325                 if (rec->ip_check_disable_ctx == NULL) {
3326                         if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3327                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3328                         }
3329                 }
3330         }
3331
3332
3333         /* if we are not the recmaster then we do not need to check
3334            if recovery is needed
3335          */
3336         if (pnn != rec->recmaster) {
3337                 return;
3338         }
3339
3340
3341         /* ensure our local copies of flags are right */
3342         ret = update_local_flags(rec, nodemap);
3343         if (ret == MONITOR_ELECTION_NEEDED) {
3344                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3345                 force_election(rec, pnn, nodemap);
3346                 return;
3347         }
3348         if (ret != MONITOR_OK) {
3349                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3350                 return;
3351         }
3352
3353         if (ctdb->num_nodes != nodemap->num) {
3354                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3355                 reload_nodes_file(ctdb);
3356                 return;
3357         }
3358
3359         /* verify that all active nodes agree that we are the recmaster */
3360         switch (verify_recmaster(rec, nodemap, pnn)) {
3361         case MONITOR_RECOVERY_NEEDED:
3362                 /* can not happen */
3363                 return;
3364         case MONITOR_ELECTION_NEEDED:
3365                 force_election(rec, pnn, nodemap);
3366                 return;
3367         case MONITOR_OK:
3368                 break;
3369         case MONITOR_FAILED:
3370                 return;
3371         }
3372
3373
3374         if (rec->need_recovery) {
3375                 /* a previous recovery didn't finish */
3376                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3377                 return;
3378         }
3379
3380         /* verify that all active nodes are in normal mode 
3381            and not in recovery mode 
3382         */
3383         switch (verify_recmode(ctdb, nodemap)) {
3384         case MONITOR_RECOVERY_NEEDED:
3385                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3386                 return;
3387         case MONITOR_FAILED:
3388                 return;
3389         case MONITOR_ELECTION_NEEDED:
3390                 /* can not happen */
3391         case MONITOR_OK:
3392                 break;
3393         }
3394
3395
3396         if (ctdb->tunable.verify_recovery_lock != 0) {
3397                 /* we should have the reclock - check its not stale */
3398                 ret = check_recovery_lock(ctdb);
3399                 if (ret != 0) {
3400                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3401                         ctdb_set_culprit(rec, ctdb->pnn);
3402                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3403                         return;
3404                 }
3405         }
3406
3407         /* if there are takeovers requested, perform it and notify the waiters */
3408         if (rec->reallocate_callers) {
3409                 process_ipreallocate_requests(ctdb, rec);
3410         }
3411
3412         /* get the nodemap for all active remote nodes
3413          */
3414         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3415         if (remote_nodemaps == NULL) {
3416                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3417                 return;
3418         }
3419         for(i=0; i<nodemap->num; i++) {
3420                 remote_nodemaps[i] = NULL;
3421         }
3422         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3423                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3424                 return;
3425         } 
3426
3427         /* verify that all other nodes have the same nodemap as we have
3428         */
3429         for (j=0; j<nodemap->num; j++) {
3430                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3431                         continue;
3432                 }
3433
3434                 if (remote_nodemaps[j] == NULL) {
3435                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3436                         ctdb_set_culprit(rec, j);
3437
3438                         return;
3439                 }
3440
3441                 /* if the nodes disagree on how many nodes there are
3442                    then this is a good reason to try recovery
3443                  */
3444                 if (remote_nodemaps[j]->num != nodemap->num) {
3445                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3446                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3447                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3448                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3449                         return;
3450                 }
3451
3452                 /* if the nodes disagree on which nodes exist and are
3453                    active, then that is also a good reason to do recovery
3454                  */
3455                 for (i=0;i<nodemap->num;i++) {
3456                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3457                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3458                                           nodemap->nodes[j].pnn, i, 
3459                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3460                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3461                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3462                                             vnnmap);
3463                                 return;
3464                         }
3465                 }
3466
3467                 /* verify the flags are consistent
3468                 */
3469                 for (i=0; i<nodemap->num; i++) {
3470                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3471                                 continue;
3472                         }
3473                         
3474                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3475                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3476                                   nodemap->nodes[j].pnn, 
3477                                   nodemap->nodes[i].pnn, 
3478                                   remote_nodemaps[j]->nodes[i].flags,
3479                                   nodemap->nodes[j].flags));
3480                                 if (i == j) {
3481                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3482                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3483                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3484                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3485                                                     vnnmap);
3486                                         return;
3487                                 } else {
3488                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3489                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3490                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3491                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3492                                                     vnnmap);
3493                                         return;
3494                                 }
3495                         }
3496                 }
3497         }
3498
3499
3500         /* there better be the same number of lmasters in the vnn map
3501            as there are active nodes or we will have to do a recovery
3502          */
3503         if (vnnmap->size != rec->num_active) {
3504                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3505                           vnnmap->size, rec->num_active));
3506                 ctdb_set_culprit(rec, ctdb->pnn);
3507                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3508                 return;
3509         }
3510
3511         /* verify that all active nodes in the nodemap also exist in 
3512            the vnnmap.
3513          */
3514         for (j=0; j<nodemap->num; j++) {
3515                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3516                         continue;
3517                 }
3518                 if (nodemap->nodes[j].pnn == pnn) {
3519                         continue;
3520                 }
3521
3522                 for (i=0; i<vnnmap->size; i++) {
3523                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3524                                 break;
3525                         }
3526                 }
3527                 if (i == vnnmap->size) {
3528                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3529                                   nodemap->nodes[j].pnn));
3530                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3531                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3532                         return;
3533                 }
3534         }
3535
3536         
3537         /* verify that all other nodes have the same vnnmap
3538            and are from the same generation
3539          */
3540         for (j=0; j<nodemap->num; j++) {
3541                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3542                         continue;
3543                 }
3544                 if (nodemap->nodes[j].pnn == pnn) {
3545                         continue;
3546                 }
3547
3548                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3549                                           mem_ctx, &remote_vnnmap);
3550                 if (ret != 0) {
3551                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3552                                   nodemap->nodes[j].pnn));
3553                         return;
3554                 }
3555
3556                 /* verify the vnnmap generation is the same */
3557                 if (vnnmap->generation != remote_vnnmap->generation) {
3558                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3559                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3560                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3561                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3562                         return;
3563                 }
3564
3565                 /* verify the vnnmap size is the same */
3566                 if (vnnmap->size != remote_vnnmap->size) {
3567                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3568                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3569                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3570                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3571                         return;
3572                 }
3573
3574                 /* verify the vnnmap is the same */
3575                 for (i=0;i<vnnmap->size;i++) {
3576                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3577                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3578                                           nodemap->nodes[j].pnn));
3579                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3580                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3581                                             vnnmap);
3582                                 return;
3583                         }
3584                 }
3585         }
3586
3587         /* we might need to change who has what IP assigned */
3588         if (rec->need_takeover_run) {
3589                 uint32_t culprit = (uint32_t)-1;
3590
3591                 rec->need_takeover_run = false;
3592
3593                 /* update the list of public ips that a node can handle for
3594                    all connected nodes
3595                 */
3596                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3597                 if (ret != 0) {
3598                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3599                                          culprit));
3600                         rec->need_takeover_run = true;
3601                         return;
3602                 }
3603
3604                 /* execute the "startrecovery" event script on all nodes */
3605                 ret = run_startrecovery_eventscript(rec, nodemap);
3606                 if (ret!=0) {
3607                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3608                         ctdb_set_culprit(rec, ctdb->pnn);
3609                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3610                         return;
3611                 }
3612
3613                 ret = ctdb_takeover_run(ctdb, nodemap);
3614                 if (ret != 0) {
3615                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses. Try again later\n"));
3616                         return;
3617                 }
3618
3619                 /* execute the "recovered" event script on all nodes */
3620                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3621 #if 0
3622 // we cant check whether the event completed successfully
3623 // since this script WILL fail if the node is in recovery mode
3624 // and if that race happens, the code here would just cause a second
3625 // cascading recovery.
3626                 if (ret!=0) {
3627                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3628                         ctdb_set_culprit(rec, ctdb->pnn);
3629                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3630                 }
3631 #endif
3632         }
3633 }
3634
3635 /*
3636   the main monitoring loop
3637  */
3638 static void monitor_cluster(struct ctdb_context *ctdb)
3639 {
3640         struct ctdb_recoverd *rec;
3641
3642         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3643
3644         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3645         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3646
3647         rec->ctdb = ctdb;
3648
3649         rec->priority_time = timeval_current();
3650
3651         /* register a message port for sending memory dumps */
3652         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3653
3654         /* register a message port for recovery elections */
3655         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
3656
3657         /* when nodes are disabled/enabled */
3658         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
3659
3660         /* when we are asked to puch out a flag change */
3661         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
3662
3663         /* register a message port for vacuum fetch */
3664         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
3665
3666         /* register a message port for reloadnodes  */
3667         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
3668
3669         /* register a message port for performing a takeover run */
3670         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
3671
3672         /* register a message port for disabling the ip check for a short while */
3673         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
3674
3675         /* register a message port for updating the recovery daemons node assignment for an ip */
3676         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
3677
3678         /* register a message port for forcing a rebalance of a node next
3679            reallocation */
3680         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
3681
3682         for (;;) {
3683                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3684                 struct timeval start;
3685                 double elapsed;
3686
3687                 if (!mem_ctx) {
3688                         DEBUG(DEBUG_CRIT,(__location__
3689                                           " Failed to create temp context\n"));
3690                         exit(-1);
3691                 }
3692
3693                 start = timeval_current();
3694                 main_loop(ctdb, rec, mem_ctx);
3695                 talloc_free(mem_ctx);
3696
3697                 /* we only check for recovery once every second */
3698                 elapsed = timeval_elapsed(&start);
3699                 if (elapsed < ctdb->tunable.recover_interval) {
3700                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
3701                                           - elapsed);
3702                 }
3703         }
3704 }
3705
3706 /*
3707   event handler for when the main ctdbd dies
3708  */
3709 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3710                                  uint16_t flags, void *private_data)
3711 {
3712         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3713         _exit(1);
3714 }
3715
3716 /*
3717   called regularly to verify that the recovery daemon is still running
3718  */
3719 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3720                               struct timeval yt, void *p)
3721 {
3722         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3723
3724         if (kill(ctdb->recoverd_pid, 0) != 0) {
3725                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
3726
3727                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
3728                                 ctdb_restart_recd, ctdb);
3729
3730                 return;
3731         }
3732
3733         event_add_timed(ctdb->ev, ctdb, 
3734                         timeval_current_ofs(30, 0),
3735                         ctdb_check_recd, ctdb);
3736 }
3737
3738 static void recd_sig_child_handler(struct event_context *ev,
3739         struct signal_event *se, int signum, int count,
3740         void *dont_care, 
3741         void *private_data)
3742 {
3743 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3744         int status;
3745         pid_t pid = -1;
3746
3747         while (pid != 0) {
3748                 pid = waitpid(-1, &status, WNOHANG);
3749                 if (pid == -1) {
3750                         if (errno != ECHILD) {
3751                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3752                         }
3753                         return;
3754                 }
3755                 if (pid > 0) {
3756                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3757                 }
3758         }
3759 }
3760
3761 /*
3762   startup the recovery daemon as a child of the main ctdb daemon
3763  */
3764 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3765 {
3766         int fd[2];
3767         struct signal_event *se;
3768         struct tevent_fd *fde;
3769
3770         if (pipe(fd) != 0) {
3771                 return -1;
3772         }
3773
3774         ctdb->ctdbd_pid = getpid();
3775
3776         ctdb->recoverd_pid = fork();
3777         if (ctdb->recoverd_pid == -1) {
3778                 return -1;
3779         }
3780         
3781         if (ctdb->recoverd_pid != 0) {
3782                 close(fd[0]);
3783                 event_add_timed(ctdb->ev, ctdb, 
3784                                 timeval_current_ofs(30, 0),
3785                                 ctdb_check_recd, ctdb);
3786                 return 0;
3787         }
3788
3789         close(fd[1]);
3790
3791         srandom(getpid() ^ time(NULL));
3792
3793         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
3794                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3795                 exit(1);
3796         }
3797
3798         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3799
3800         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
3801                      ctdb_recoverd_parent, &fd[0]);     
3802         tevent_fd_set_auto_close(fde);
3803
3804         /* set up a handler to pick up sigchld */
3805         se = event_add_signal(ctdb->ev, ctdb,
3806                                      SIGCHLD, 0,
3807                                      recd_sig_child_handler,
3808                                      ctdb);
3809         if (se == NULL) {
3810                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3811                 exit(1);
3812         }
3813
3814         monitor_cluster(ctdb);
3815
3816         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3817         return -1;
3818 }
3819
3820 /*
3821   shutdown the recovery daemon
3822  */
3823 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3824 {
3825         if (ctdb->recoverd_pid == 0) {
3826                 return;
3827         }
3828
3829         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3830         kill(ctdb->recoverd_pid, SIGTERM);
3831 }
3832
3833 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
3834                        struct timeval t, void *private_data)
3835 {
3836         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3837
3838         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
3839         ctdb_stop_recoverd(ctdb);
3840         ctdb_start_recoverd(ctdb);
3841 }