allow setting the recmode even when not completely frozen.
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67 };
68
69 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
70 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
71
72
73 /*
74   ban a node for a period of time
75  */
76 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
77 {
78         int ret;
79         struct ctdb_context *ctdb = rec->ctdb;
80         struct ctdb_ban_time bantime;
81        
82         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
83
84         if (!ctdb_validate_pnn(ctdb, pnn)) {
85                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
86                 return;
87         }
88
89         bantime.pnn  = pnn;
90         bantime.time = ban_time;
91
92         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
93         if (ret != 0) {
94                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
95                 return;
96         }
97
98 }
99
100 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
101
102
103 /*
104   run the "recovered" eventscript on all nodes
105  */
106 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
107 {
108         TALLOC_CTX *tmp_ctx;
109         uint32_t *nodes;
110
111         tmp_ctx = talloc_new(ctdb);
112         CTDB_NO_MEMORY(ctdb, tmp_ctx);
113
114         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
115         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
116                                         nodes, 0,
117                                         CONTROL_TIMEOUT(), false, tdb_null,
118                                         NULL, NULL,
119                                         NULL) != 0) {
120                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
121
122                 talloc_free(tmp_ctx);
123                 return -1;
124         }
125
126         talloc_free(tmp_ctx);
127         return 0;
128 }
129
130 /*
131   remember the trouble maker
132  */
133 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
134 {
135         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
136         struct ctdb_banning_state *ban_state;
137
138         if (culprit > ctdb->num_nodes) {
139                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
140                 return;
141         }
142
143         if (ctdb->nodes[culprit]->ban_state == NULL) {
144                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
145                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
146
147                 
148         }
149         ban_state = ctdb->nodes[culprit]->ban_state;
150         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
151                 /* this was the first time in a long while this node
152                    misbehaved so we will forgive any old transgressions.
153                 */
154                 ban_state->count = 0;
155         }
156
157         ban_state->count += count;
158         ban_state->last_reported_time = timeval_current();
159         rec->last_culprit_node = culprit;
160 }
161
162 /*
163   remember the trouble maker
164  */
165 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
166 {
167         ctdb_set_culprit_count(rec, culprit, 1);
168 }
169
170
171 /* this callback is called for every node that failed to execute the
172    start recovery event
173 */
174 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
175 {
176         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
177
178         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
179
180         ctdb_set_culprit(rec, node_pnn);
181 }
182
183 /*
184   run the "startrecovery" eventscript on all nodes
185  */
186 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
187 {
188         TALLOC_CTX *tmp_ctx;
189         uint32_t *nodes;
190         struct ctdb_context *ctdb = rec->ctdb;
191
192         tmp_ctx = talloc_new(ctdb);
193         CTDB_NO_MEMORY(ctdb, tmp_ctx);
194
195         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
196         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
197                                         nodes, 0,
198                                         CONTROL_TIMEOUT(), false, tdb_null,
199                                         NULL,
200                                         startrecovery_fail_callback,
201                                         rec) != 0) {
202                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
203                 talloc_free(tmp_ctx);
204                 return -1;
205         }
206
207         talloc_free(tmp_ctx);
208         return 0;
209 }
210
211 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
212 {
213         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
214                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
215                 return;
216         }
217         if (node_pnn < ctdb->num_nodes) {
218                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
219         }
220 }
221
222 /*
223   update the node capabilities for all connected nodes
224  */
225 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
226 {
227         uint32_t *nodes;
228         TALLOC_CTX *tmp_ctx;
229
230         tmp_ctx = talloc_new(ctdb);
231         CTDB_NO_MEMORY(ctdb, tmp_ctx);
232
233         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
234         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
235                                         nodes, 0,
236                                         CONTROL_TIMEOUT(),
237                                         false, tdb_null,
238                                         async_getcap_callback, NULL,
239                                         NULL) != 0) {
240                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
241                 talloc_free(tmp_ctx);
242                 return -1;
243         }
244
245         talloc_free(tmp_ctx);
246         return 0;
247 }
248
249 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
250 {
251         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
252
253         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
254         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
255 }
256
257 /*
258   change recovery mode on all nodes
259  */
260 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
261 {
262         TDB_DATA data;
263         uint32_t *nodes;
264         TALLOC_CTX *tmp_ctx;
265
266         tmp_ctx = talloc_new(ctdb);
267         CTDB_NO_MEMORY(ctdb, tmp_ctx);
268
269         /* freeze all nodes */
270         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
271         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
272                 int i;
273
274                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
275                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
276                                                 nodes, i,
277                                                 CONTROL_TIMEOUT(),
278                                                 false, tdb_null,
279                                                 NULL,
280                                                 set_recmode_fail_callback,
281                                                 rec) != 0) {
282                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
283                                 talloc_free(tmp_ctx);
284                                 return -1;
285                         }
286                 }
287         }
288
289
290         data.dsize = sizeof(uint32_t);
291         data.dptr = (unsigned char *)&rec_mode;
292
293         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
294                                         nodes, 0,
295                                         CONTROL_TIMEOUT(),
296                                         false, data,
297                                         NULL, NULL,
298                                         NULL) != 0) {
299                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
300                 talloc_free(tmp_ctx);
301                 return -1;
302         }
303
304         talloc_free(tmp_ctx);
305         return 0;
306 }
307
308 /*
309   change recovery master on all node
310  */
311 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
312 {
313         TDB_DATA data;
314         TALLOC_CTX *tmp_ctx;
315         uint32_t *nodes;
316
317         tmp_ctx = talloc_new(ctdb);
318         CTDB_NO_MEMORY(ctdb, tmp_ctx);
319
320         data.dsize = sizeof(uint32_t);
321         data.dptr = (unsigned char *)&pnn;
322
323         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
324         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
325                                         nodes, 0,
326                                         CONTROL_TIMEOUT(), false, data,
327                                         NULL, NULL,
328                                         NULL) != 0) {
329                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
330                 talloc_free(tmp_ctx);
331                 return -1;
332         }
333
334         talloc_free(tmp_ctx);
335         return 0;
336 }
337
338 /* update all remote nodes to use the same db priority that we have
339    this can fail if the remove node has not yet been upgraded to 
340    support this function, so we always return success and never fail
341    a recovery if this call fails.
342 */
343 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
344         struct ctdb_node_map *nodemap, 
345         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
346 {
347         int db;
348         uint32_t *nodes;
349
350         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
351
352         /* step through all local databases */
353         for (db=0; db<dbmap->num;db++) {
354                 TDB_DATA data;
355                 struct ctdb_db_priority db_prio;
356                 int ret;
357
358                 db_prio.db_id     = dbmap->dbs[db].dbid;
359                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
360                 if (ret != 0) {
361                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
362                         continue;
363                 }
364
365                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
366
367                 data.dptr  = (uint8_t *)&db_prio;
368                 data.dsize = sizeof(db_prio);
369
370                 if (ctdb_client_async_control(ctdb,
371                                         CTDB_CONTROL_SET_DB_PRIORITY,
372                                         nodes, 0,
373                                         CONTROL_TIMEOUT(), false, data,
374                                         NULL, NULL,
375                                         NULL) != 0) {
376                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
377                 }
378         }
379
380         return 0;
381 }                       
382
383 /*
384   ensure all other nodes have attached to any databases that we have
385  */
386 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
387                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
388 {
389         int i, j, db, ret;
390         struct ctdb_dbid_map *remote_dbmap;
391
392         /* verify that all other nodes have all our databases */
393         for (j=0; j<nodemap->num; j++) {
394                 /* we dont need to ourself ourselves */
395                 if (nodemap->nodes[j].pnn == pnn) {
396                         continue;
397                 }
398                 /* dont check nodes that are unavailable */
399                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
400                         continue;
401                 }
402
403                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
404                                          mem_ctx, &remote_dbmap);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
407                         return -1;
408                 }
409
410                 /* step through all local databases */
411                 for (db=0; db<dbmap->num;db++) {
412                         const char *name;
413
414
415                         for (i=0;i<remote_dbmap->num;i++) {
416                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
417                                         break;
418                                 }
419                         }
420                         /* the remote node already have this database */
421                         if (i!=remote_dbmap->num) {
422                                 continue;
423                         }
424                         /* ok so we need to create this database */
425                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
426                                             mem_ctx, &name);
427                         if (ret != 0) {
428                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
429                                 return -1;
430                         }
431                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
432                                            mem_ctx, name, dbmap->dbs[db].persistent);
433                         if (ret != 0) {
434                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
435                                 return -1;
436                         }
437                 }
438         }
439
440         return 0;
441 }
442
443
444 /*
445   ensure we are attached to any databases that anyone else is attached to
446  */
447 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
448                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
449 {
450         int i, j, db, ret;
451         struct ctdb_dbid_map *remote_dbmap;
452
453         /* verify that we have all database any other node has */
454         for (j=0; j<nodemap->num; j++) {
455                 /* we dont need to ourself ourselves */
456                 if (nodemap->nodes[j].pnn == pnn) {
457                         continue;
458                 }
459                 /* dont check nodes that are unavailable */
460                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
461                         continue;
462                 }
463
464                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
465                                          mem_ctx, &remote_dbmap);
466                 if (ret != 0) {
467                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
468                         return -1;
469                 }
470
471                 /* step through all databases on the remote node */
472                 for (db=0; db<remote_dbmap->num;db++) {
473                         const char *name;
474
475                         for (i=0;i<(*dbmap)->num;i++) {
476                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
477                                         break;
478                                 }
479                         }
480                         /* we already have this db locally */
481                         if (i!=(*dbmap)->num) {
482                                 continue;
483                         }
484                         /* ok so we need to create this database and
485                            rebuild dbmap
486                          */
487                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
488                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
489                         if (ret != 0) {
490                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
491                                           nodemap->nodes[j].pnn));
492                                 return -1;
493                         }
494                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
495                                            remote_dbmap->dbs[db].persistent);
496                         if (ret != 0) {
497                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
498                                 return -1;
499                         }
500                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
501                         if (ret != 0) {
502                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
503                                 return -1;
504                         }
505                 }
506         }
507
508         return 0;
509 }
510
511
512 /*
513   pull the remote database contents from one node into the recdb
514  */
515 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
516                                     struct tdb_wrap *recdb, uint32_t dbid)
517 {
518         int ret;
519         TDB_DATA outdata;
520         struct ctdb_marshall_buffer *reply;
521         struct ctdb_rec_data *rec;
522         int i;
523         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
524
525         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
526                                CONTROL_TIMEOUT(), &outdata);
527         if (ret != 0) {
528                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
529                 talloc_free(tmp_ctx);
530                 return -1;
531         }
532
533         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
534
535         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
536                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
537                 talloc_free(tmp_ctx);
538                 return -1;
539         }
540         
541         rec = (struct ctdb_rec_data *)&reply->data[0];
542         
543         for (i=0;
544              i<reply->count;
545              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
546                 TDB_DATA key, data;
547                 struct ctdb_ltdb_header *hdr;
548                 TDB_DATA existing;
549                 
550                 key.dptr = &rec->data[0];
551                 key.dsize = rec->keylen;
552                 data.dptr = &rec->data[key.dsize];
553                 data.dsize = rec->datalen;
554                 
555                 hdr = (struct ctdb_ltdb_header *)data.dptr;
556
557                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
558                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
559                         talloc_free(tmp_ctx);
560                         return -1;
561                 }
562
563                 /* fetch the existing record, if any */
564                 existing = tdb_fetch(recdb->tdb, key);
565                 
566                 if (existing.dptr != NULL) {
567                         struct ctdb_ltdb_header header;
568                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
569                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
570                                          (unsigned)existing.dsize, srcnode));
571                                 free(existing.dptr);
572                                 talloc_free(tmp_ctx);
573                                 return -1;
574                         }
575                         header = *(struct ctdb_ltdb_header *)existing.dptr;
576                         free(existing.dptr);
577                         if (!(header.rsn < hdr->rsn ||
578                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
579                                 continue;
580                         }
581                 }
582                 
583                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
584                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
585                         talloc_free(tmp_ctx);
586                         return -1;                              
587                 }
588         }
589
590         talloc_free(tmp_ctx);
591
592         return 0;
593 }
594
595 /*
596   pull all the remote database contents into the recdb
597  */
598 static int pull_remote_database(struct ctdb_context *ctdb,
599                                 struct ctdb_recoverd *rec, 
600                                 struct ctdb_node_map *nodemap, 
601                                 struct tdb_wrap *recdb, uint32_t dbid)
602 {
603         int j;
604
605         /* pull all records from all other nodes across onto this node
606            (this merges based on rsn)
607         */
608         for (j=0; j<nodemap->num; j++) {
609                 /* dont merge from nodes that are unavailable */
610                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
611                         continue;
612                 }
613                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
614                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
615                                  nodemap->nodes[j].pnn));
616                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
617                         return -1;
618                 }
619         }
620         
621         return 0;
622 }
623
624
625 /*
626   update flags on all active nodes
627  */
628 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
629 {
630         int ret;
631
632         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
633                 if (ret != 0) {
634                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
635                 return -1;
636         }
637
638         return 0;
639 }
640
641 /*
642   ensure all nodes have the same vnnmap we do
643  */
644 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
645                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
646 {
647         int j, ret;
648
649         /* push the new vnn map out to all the nodes */
650         for (j=0; j<nodemap->num; j++) {
651                 /* dont push to nodes that are unavailable */
652                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
653                         continue;
654                 }
655
656                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
657                 if (ret != 0) {
658                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
659                         return -1;
660                 }
661         }
662
663         return 0;
664 }
665
666
667 struct vacuum_info {
668         struct vacuum_info *next, *prev;
669         struct ctdb_recoverd *rec;
670         uint32_t srcnode;
671         struct ctdb_db_context *ctdb_db;
672         struct ctdb_marshall_buffer *recs;
673         struct ctdb_rec_data *r;
674 };
675
676 static void vacuum_fetch_next(struct vacuum_info *v);
677
678 /*
679   called when a vacuum fetch has completed - just free it and do the next one
680  */
681 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
682 {
683         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
684         talloc_free(state);
685         vacuum_fetch_next(v);
686 }
687
688
689 /*
690   process the next element from the vacuum list
691 */
692 static void vacuum_fetch_next(struct vacuum_info *v)
693 {
694         struct ctdb_call call;
695         struct ctdb_rec_data *r;
696
697         while (v->recs->count) {
698                 struct ctdb_client_call_state *state;
699                 TDB_DATA data;
700                 struct ctdb_ltdb_header *hdr;
701
702                 ZERO_STRUCT(call);
703                 call.call_id = CTDB_NULL_FUNC;
704                 call.flags = CTDB_IMMEDIATE_MIGRATION;
705
706                 r = v->r;
707                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
708                 v->recs->count--;
709
710                 call.key.dptr = &r->data[0];
711                 call.key.dsize = r->keylen;
712
713                 /* ensure we don't block this daemon - just skip a record if we can't get
714                    the chainlock */
715                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
716                         continue;
717                 }
718
719                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
720                 if (data.dptr == NULL) {
721                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
722                         continue;
723                 }
724
725                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
726                         free(data.dptr);
727                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
728                         continue;
729                 }
730                 
731                 hdr = (struct ctdb_ltdb_header *)data.dptr;
732                 if (hdr->dmaster == v->rec->ctdb->pnn) {
733                         /* its already local */
734                         free(data.dptr);
735                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
736                         continue;
737                 }
738
739                 free(data.dptr);
740
741                 state = ctdb_call_send(v->ctdb_db, &call);
742                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
743                 if (state == NULL) {
744                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
745                         talloc_free(v);
746                         return;
747                 }
748                 state->async.fn = vacuum_fetch_callback;
749                 state->async.private_data = v;
750                 return;
751         }
752
753         talloc_free(v);
754 }
755
756
757 /*
758   destroy a vacuum info structure
759  */
760 static int vacuum_info_destructor(struct vacuum_info *v)
761 {
762         DLIST_REMOVE(v->rec->vacuum_info, v);
763         return 0;
764 }
765
766
767 /*
768   handler for vacuum fetch
769 */
770 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
771                                  TDB_DATA data, void *private_data)
772 {
773         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
774         struct ctdb_marshall_buffer *recs;
775         int ret, i;
776         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
777         const char *name;
778         struct ctdb_dbid_map *dbmap=NULL;
779         bool persistent = false;
780         struct ctdb_db_context *ctdb_db;
781         struct ctdb_rec_data *r;
782         uint32_t srcnode;
783         struct vacuum_info *v;
784
785         recs = (struct ctdb_marshall_buffer *)data.dptr;
786         r = (struct ctdb_rec_data *)&recs->data[0];
787
788         if (recs->count == 0) {
789                 talloc_free(tmp_ctx);
790                 return;
791         }
792
793         srcnode = r->reqid;
794
795         for (v=rec->vacuum_info;v;v=v->next) {
796                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
797                         /* we're already working on records from this node */
798                         talloc_free(tmp_ctx);
799                         return;
800                 }
801         }
802
803         /* work out if the database is persistent */
804         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
805         if (ret != 0) {
806                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
807                 talloc_free(tmp_ctx);
808                 return;
809         }
810
811         for (i=0;i<dbmap->num;i++) {
812                 if (dbmap->dbs[i].dbid == recs->db_id) {
813                         persistent = dbmap->dbs[i].persistent;
814                         break;
815                 }
816         }
817         if (i == dbmap->num) {
818                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
819                 talloc_free(tmp_ctx);
820                 return;         
821         }
822
823         /* find the name of this database */
824         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
825                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
826                 talloc_free(tmp_ctx);
827                 return;
828         }
829
830         /* attach to it */
831         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
832         if (ctdb_db == NULL) {
833                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
834                 talloc_free(tmp_ctx);
835                 return;
836         }
837
838         v = talloc_zero(rec, struct vacuum_info);
839         if (v == NULL) {
840                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
841                 talloc_free(tmp_ctx);
842                 return;
843         }
844
845         v->rec = rec;
846         v->srcnode = srcnode;
847         v->ctdb_db = ctdb_db;
848         v->recs = talloc_memdup(v, recs, data.dsize);
849         if (v->recs == NULL) {
850                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
851                 talloc_free(v);
852                 talloc_free(tmp_ctx);
853                 return;         
854         }
855         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
856
857         DLIST_ADD(rec->vacuum_info, v);
858
859         talloc_set_destructor(v, vacuum_info_destructor);
860
861         vacuum_fetch_next(v);
862         talloc_free(tmp_ctx);
863 }
864
865
866 /*
867   called when ctdb_wait_timeout should finish
868  */
869 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
870                               struct timeval yt, void *p)
871 {
872         uint32_t *timed_out = (uint32_t *)p;
873         (*timed_out) = 1;
874 }
875
876 /*
877   wait for a given number of seconds
878  */
879 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
880 {
881         uint32_t timed_out = 0;
882         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
883         while (!timed_out) {
884                 event_loop_once(ctdb->ev);
885         }
886 }
887
888 /*
889   called when an election times out (ends)
890  */
891 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
892                                   struct timeval t, void *p)
893 {
894         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
895         rec->election_timeout = NULL;
896
897         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
898 }
899
900
901 /*
902   wait for an election to finish. It finished election_timeout seconds after
903   the last election packet is received
904  */
905 static void ctdb_wait_election(struct ctdb_recoverd *rec)
906 {
907         struct ctdb_context *ctdb = rec->ctdb;
908         while (rec->election_timeout) {
909                 event_loop_once(ctdb->ev);
910         }
911 }
912
913 /*
914   Update our local flags from all remote connected nodes. 
915   This is only run when we are or we belive we are the recovery master
916  */
917 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
918 {
919         int j;
920         struct ctdb_context *ctdb = rec->ctdb;
921         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
922
923         /* get the nodemap for all active remote nodes and verify
924            they are the same as for this node
925          */
926         for (j=0; j<nodemap->num; j++) {
927                 struct ctdb_node_map *remote_nodemap=NULL;
928                 int ret;
929
930                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
931                         continue;
932                 }
933                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
934                         continue;
935                 }
936
937                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
938                                            mem_ctx, &remote_nodemap);
939                 if (ret != 0) {
940                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
941                                   nodemap->nodes[j].pnn));
942                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
943                         talloc_free(mem_ctx);
944                         return MONITOR_FAILED;
945                 }
946                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
947                         /* We should tell our daemon about this so it
948                            updates its flags or else we will log the same 
949                            message again in the next iteration of recovery.
950                            Since we are the recovery master we can just as
951                            well update the flags on all nodes.
952                         */
953                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
954                         if (ret != 0) {
955                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
956                                 return -1;
957                         }
958
959                         /* Update our local copy of the flags in the recovery
960                            daemon.
961                         */
962                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
963                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
964                                  nodemap->nodes[j].flags));
965                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
966                 }
967                 talloc_free(remote_nodemap);
968         }
969         talloc_free(mem_ctx);
970         return MONITOR_OK;
971 }
972
973
974 /* Create a new random generation ip. 
975    The generation id can not be the INVALID_GENERATION id
976 */
977 static uint32_t new_generation(void)
978 {
979         uint32_t generation;
980
981         while (1) {
982                 generation = random();
983
984                 if (generation != INVALID_GENERATION) {
985                         break;
986                 }
987         }
988
989         return generation;
990 }
991
992
993 /*
994   create a temporary working database
995  */
996 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
997 {
998         char *name;
999         struct tdb_wrap *recdb;
1000         unsigned tdb_flags;
1001
1002         /* open up the temporary recovery database */
1003         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb", ctdb->db_directory);
1004         if (name == NULL) {
1005                 return NULL;
1006         }
1007         unlink(name);
1008
1009         tdb_flags = TDB_NOLOCK;
1010         if (!ctdb->do_setsched) {
1011                 tdb_flags |= TDB_NOMMAP;
1012         }
1013
1014         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1015                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1016         if (recdb == NULL) {
1017                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1018         }
1019
1020         talloc_free(name);
1021
1022         return recdb;
1023 }
1024
1025
1026 /* 
1027    a traverse function for pulling all relevent records from recdb
1028  */
1029 struct recdb_data {
1030         struct ctdb_context *ctdb;
1031         struct ctdb_marshall_buffer *recdata;
1032         uint32_t len;
1033         bool failed;
1034 };
1035
1036 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1037 {
1038         struct recdb_data *params = (struct recdb_data *)p;
1039         struct ctdb_rec_data *rec;
1040         struct ctdb_ltdb_header *hdr;
1041
1042         /* skip empty records */
1043         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1044                 return 0;
1045         }
1046
1047         /* update the dmaster field to point to us */
1048         hdr = (struct ctdb_ltdb_header *)data.dptr;
1049         hdr->dmaster = params->ctdb->pnn;
1050
1051         /* add the record to the blob ready to send to the nodes */
1052         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1053         if (rec == NULL) {
1054                 params->failed = true;
1055                 return -1;
1056         }
1057         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1058         if (params->recdata == NULL) {
1059                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1060                          rec->length + params->len, params->recdata->count));
1061                 params->failed = true;
1062                 return -1;
1063         }
1064         params->recdata->count++;
1065         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1066         params->len += rec->length;
1067         talloc_free(rec);
1068
1069         return 0;
1070 }
1071
1072 /*
1073   push the recdb database out to all nodes
1074  */
1075 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1076                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1077 {
1078         struct recdb_data params;
1079         struct ctdb_marshall_buffer *recdata;
1080         TDB_DATA outdata;
1081         TALLOC_CTX *tmp_ctx;
1082         uint32_t *nodes;
1083
1084         tmp_ctx = talloc_new(ctdb);
1085         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1086
1087         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1088         CTDB_NO_MEMORY(ctdb, recdata);
1089
1090         recdata->db_id = dbid;
1091
1092         params.ctdb = ctdb;
1093         params.recdata = recdata;
1094         params.len = offsetof(struct ctdb_marshall_buffer, data);
1095         params.failed = false;
1096
1097         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1098                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1099                 talloc_free(params.recdata);
1100                 talloc_free(tmp_ctx);
1101                 return -1;
1102         }
1103
1104         if (params.failed) {
1105                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1106                 talloc_free(params.recdata);
1107                 talloc_free(tmp_ctx);
1108                 return -1;              
1109         }
1110
1111         recdata = params.recdata;
1112
1113         outdata.dptr = (void *)recdata;
1114         outdata.dsize = params.len;
1115
1116         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1117         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1118                                         nodes, 0,
1119                                         CONTROL_TIMEOUT(), false, outdata,
1120                                         NULL, NULL,
1121                                         NULL) != 0) {
1122                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1123                 talloc_free(recdata);
1124                 talloc_free(tmp_ctx);
1125                 return -1;
1126         }
1127
1128         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1129                   dbid, recdata->count));
1130
1131         talloc_free(recdata);
1132         talloc_free(tmp_ctx);
1133
1134         return 0;
1135 }
1136
1137
1138 /*
1139   go through a full recovery on one database 
1140  */
1141 static int recover_database(struct ctdb_recoverd *rec, 
1142                             TALLOC_CTX *mem_ctx,
1143                             uint32_t dbid,
1144                             uint32_t pnn, 
1145                             struct ctdb_node_map *nodemap,
1146                             uint32_t transaction_id)
1147 {
1148         struct tdb_wrap *recdb;
1149         int ret;
1150         struct ctdb_context *ctdb = rec->ctdb;
1151         TDB_DATA data;
1152         struct ctdb_control_wipe_database w;
1153         uint32_t *nodes;
1154
1155         recdb = create_recdb(ctdb, mem_ctx);
1156         if (recdb == NULL) {
1157                 return -1;
1158         }
1159
1160         /* pull all remote databases onto the recdb */
1161         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid);
1162         if (ret != 0) {
1163                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1164                 return -1;
1165         }
1166
1167         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1168
1169         /* wipe all the remote databases. This is safe as we are in a transaction */
1170         w.db_id = dbid;
1171         w.transaction_id = transaction_id;
1172
1173         data.dptr = (void *)&w;
1174         data.dsize = sizeof(w);
1175
1176         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1177         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1178                                         nodes, 0,
1179                                         CONTROL_TIMEOUT(), false, data,
1180                                         NULL, NULL,
1181                                         NULL) != 0) {
1182                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1183                 talloc_free(recdb);
1184                 return -1;
1185         }
1186         
1187         /* push out the correct database. This sets the dmaster and skips 
1188            the empty records */
1189         ret = push_recdb_database(ctdb, dbid, recdb, nodemap);
1190         if (ret != 0) {
1191                 talloc_free(recdb);
1192                 return -1;
1193         }
1194
1195         /* all done with this database */
1196         talloc_free(recdb);
1197
1198         return 0;
1199 }
1200
1201 /*
1202   reload the nodes file 
1203 */
1204 static void reload_nodes_file(struct ctdb_context *ctdb)
1205 {
1206         ctdb->nodes = NULL;
1207         ctdb_load_nodes_file(ctdb);
1208 }
1209
1210         
1211 /*
1212   we are the recmaster, and recovery is needed - start a recovery run
1213  */
1214 static int do_recovery(struct ctdb_recoverd *rec, 
1215                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1216                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1217 {
1218         struct ctdb_context *ctdb = rec->ctdb;
1219         int i, j, ret;
1220         uint32_t generation;
1221         struct ctdb_dbid_map *dbmap;
1222         TDB_DATA data;
1223         uint32_t *nodes;
1224         struct timeval start_time;
1225
1226         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1227
1228         /* if recovery fails, force it again */
1229         rec->need_recovery = true;
1230
1231         for (i=0; i<ctdb->num_nodes; i++) {
1232                 struct ctdb_banning_state *ban_state;
1233
1234                 if (ctdb->nodes[i]->ban_state == NULL) {
1235                         continue;
1236                 }
1237                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1238                 if (ban_state->count < 2*ctdb->num_nodes) {
1239                         continue;
1240                 }
1241                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1242                         ctdb->nodes[i]->pnn, ban_state->count,
1243                         ctdb->tunable.recovery_ban_period));
1244                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1245                 ban_state->count = 0;
1246         }
1247
1248
1249         if (ctdb->tunable.verify_recovery_lock != 0) {
1250                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1251                 start_time = timeval_current();
1252                 if (!ctdb_recovery_lock(ctdb, true)) {
1253                         ctdb_set_culprit(rec, pnn);
1254                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1255                         return -1;
1256                 }
1257                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1258                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1259         }
1260
1261         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1262
1263         /* get a list of all databases */
1264         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1265         if (ret != 0) {
1266                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1267                 return -1;
1268         }
1269
1270         /* we do the db creation before we set the recovery mode, so the freeze happens
1271            on all databases we will be dealing with. */
1272
1273         /* verify that we have all the databases any other node has */
1274         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1275         if (ret != 0) {
1276                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1277                 return -1;
1278         }
1279
1280         /* verify that all other nodes have all our databases */
1281         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1282         if (ret != 0) {
1283                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1284                 return -1;
1285         }
1286         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1287
1288         /* update the database priority for all remote databases */
1289         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1290         if (ret != 0) {
1291                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1292         }
1293         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1294
1295
1296         /* set recovery mode to active on all nodes */
1297         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1298         if (ret != 0) {
1299                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1300                 return -1;
1301         }
1302
1303         /* execute the "startrecovery" event script on all nodes */
1304         ret = run_startrecovery_eventscript(rec, nodemap);
1305         if (ret!=0) {
1306                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1307                 return -1;
1308         }
1309
1310         /* pick a new generation number */
1311         generation = new_generation();
1312
1313         /* change the vnnmap on this node to use the new generation 
1314            number but not on any other nodes.
1315            this guarantees that if we abort the recovery prematurely
1316            for some reason (a node stops responding?)
1317            that we can just return immediately and we will reenter
1318            recovery shortly again.
1319            I.e. we deliberately leave the cluster with an inconsistent
1320            generation id to allow us to abort recovery at any stage and
1321            just restart it from scratch.
1322          */
1323         vnnmap->generation = generation;
1324         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1325         if (ret != 0) {
1326                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1327                 return -1;
1328         }
1329
1330         data.dptr = (void *)&generation;
1331         data.dsize = sizeof(uint32_t);
1332
1333         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1334         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1335                                         nodes, 0,
1336                                         CONTROL_TIMEOUT(), false, data,
1337                                         NULL, NULL,
1338                                         NULL) != 0) {
1339                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1340                 return -1;
1341         }
1342
1343         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1344
1345         for (i=0;i<dbmap->num;i++) {
1346                 if (recover_database(rec, mem_ctx, dbmap->dbs[i].dbid, pnn, nodemap, generation) != 0) {
1347                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1348                         return -1;
1349                 }
1350         }
1351
1352         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1353
1354         /* commit all the changes */
1355         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1356                                         nodes, 0,
1357                                         CONTROL_TIMEOUT(), false, data,
1358                                         NULL, NULL,
1359                                         NULL) != 0) {
1360                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1361                 return -1;
1362         }
1363
1364         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1365         
1366
1367         /* update the capabilities for all nodes */
1368         ret = update_capabilities(ctdb, nodemap);
1369         if (ret!=0) {
1370                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1371                 return -1;
1372         }
1373
1374         /* build a new vnn map with all the currently active and
1375            unbanned nodes */
1376         generation = new_generation();
1377         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1378         CTDB_NO_MEMORY(ctdb, vnnmap);
1379         vnnmap->generation = generation;
1380         vnnmap->size = 0;
1381         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1382         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1383         for (i=j=0;i<nodemap->num;i++) {
1384                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1385                         continue;
1386                 }
1387                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1388                         /* this node can not be an lmaster */
1389                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1390                         continue;
1391                 }
1392
1393                 vnnmap->size++;
1394                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1395                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1396                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1397
1398         }
1399         if (vnnmap->size == 0) {
1400                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1401                 vnnmap->size++;
1402                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1403                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1404                 vnnmap->map[0] = pnn;
1405         }       
1406
1407         /* update to the new vnnmap on all nodes */
1408         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1409         if (ret != 0) {
1410                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1411                 return -1;
1412         }
1413
1414         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1415
1416         /* update recmaster to point to us for all nodes */
1417         ret = set_recovery_master(ctdb, nodemap, pnn);
1418         if (ret!=0) {
1419                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1420                 return -1;
1421         }
1422
1423         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1424
1425         /*
1426           update all nodes to have the same flags that we have
1427          */
1428         for (i=0;i<nodemap->num;i++) {
1429                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1430                         continue;
1431                 }
1432
1433                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1434                 if (ret != 0) {
1435                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1436                         return -1;
1437                 }
1438         }
1439
1440         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1441
1442         /* disable recovery mode */
1443         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1444         if (ret != 0) {
1445                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1446                 return -1;
1447         }
1448
1449         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1450
1451         /*
1452           tell nodes to takeover their public IPs
1453          */
1454         rec->need_takeover_run = false;
1455         ret = ctdb_takeover_run(ctdb, nodemap);
1456         if (ret != 0) {
1457                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1458                 return -1;
1459         }
1460         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1461
1462         /* execute the "recovered" event script on all nodes */
1463         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1464         if (ret!=0) {
1465                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1466                 return -1;
1467         }
1468
1469         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1470
1471         /* send a message to all clients telling them that the cluster 
1472            has been reconfigured */
1473         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1474
1475         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1476
1477         rec->need_recovery = false;
1478
1479         /* we managed to complete a full recovery, make sure to forgive
1480            any past sins by the nodes that could now participate in the
1481            recovery.
1482         */
1483         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1484         for (i=0;i<nodemap->num;i++) {
1485                 struct ctdb_banning_state *ban_state;
1486
1487                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1488                         continue;
1489                 }
1490
1491                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1492                 if (ban_state == NULL) {
1493                         continue;
1494                 }
1495
1496                 ban_state->count = 0;
1497         }
1498
1499
1500         /* We just finished a recovery successfully. 
1501            We now wait for rerecovery_timeout before we allow 
1502            another recovery to take place.
1503         */
1504         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1505         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1506         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1507
1508         return 0;
1509 }
1510
1511
1512 /*
1513   elections are won by first checking the number of connected nodes, then
1514   the priority time, then the pnn
1515  */
1516 struct election_message {
1517         uint32_t num_connected;
1518         struct timeval priority_time;
1519         uint32_t pnn;
1520         uint32_t node_flags;
1521 };
1522
1523 /*
1524   form this nodes election data
1525  */
1526 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1527 {
1528         int ret, i;
1529         struct ctdb_node_map *nodemap;
1530         struct ctdb_context *ctdb = rec->ctdb;
1531
1532         ZERO_STRUCTP(em);
1533
1534         em->pnn = rec->ctdb->pnn;
1535         em->priority_time = rec->priority_time;
1536
1537         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1538         if (ret != 0) {
1539                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1540                 return;
1541         }
1542
1543         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1544         em->node_flags = rec->node_flags;
1545
1546         for (i=0;i<nodemap->num;i++) {
1547                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1548                         em->num_connected++;
1549                 }
1550         }
1551
1552         /* we shouldnt try to win this election if we cant be a recmaster */
1553         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1554                 em->num_connected = 0;
1555                 em->priority_time = timeval_current();
1556         }
1557
1558         talloc_free(nodemap);
1559 }
1560
1561 /*
1562   see if the given election data wins
1563  */
1564 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1565 {
1566         struct election_message myem;
1567         int cmp = 0;
1568
1569         ctdb_election_data(rec, &myem);
1570
1571         /* we cant win if we dont have the recmaster capability */
1572         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1573                 return false;
1574         }
1575
1576         /* we cant win if we are banned */
1577         if (rec->node_flags & NODE_FLAGS_BANNED) {
1578                 return false;
1579         }       
1580
1581         /* we cant win if we are stopped */
1582         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1583                 return false;
1584         }       
1585
1586         /* we will automatically win if the other node is banned */
1587         if (em->node_flags & NODE_FLAGS_BANNED) {
1588                 return true;
1589         }
1590
1591         /* we will automatically win if the other node is banned */
1592         if (em->node_flags & NODE_FLAGS_STOPPED) {
1593                 return true;
1594         }
1595
1596         /* try to use the most connected node */
1597         if (cmp == 0) {
1598                 cmp = (int)myem.num_connected - (int)em->num_connected;
1599         }
1600
1601         /* then the longest running node */
1602         if (cmp == 0) {
1603                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1604         }
1605
1606         if (cmp == 0) {
1607                 cmp = (int)myem.pnn - (int)em->pnn;
1608         }
1609
1610         return cmp > 0;
1611 }
1612
1613 /*
1614   send out an election request
1615  */
1616 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1617 {
1618         int ret;
1619         TDB_DATA election_data;
1620         struct election_message emsg;
1621         uint64_t srvid;
1622         struct ctdb_context *ctdb = rec->ctdb;
1623
1624         srvid = CTDB_SRVID_RECOVERY;
1625
1626         ctdb_election_data(rec, &emsg);
1627
1628         election_data.dsize = sizeof(struct election_message);
1629         election_data.dptr  = (unsigned char *)&emsg;
1630
1631
1632         /* send an election message to all active nodes */
1633         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1634         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1635
1636
1637         /* A new node that is already frozen has entered the cluster.
1638            The existing nodes are not frozen and dont need to be frozen
1639            until the election has ended and we start the actual recovery
1640         */
1641         if (update_recmaster == true) {
1642                 /* first we assume we will win the election and set 
1643                    recoverymaster to be ourself on the current node
1644                  */
1645                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1646                 if (ret != 0) {
1647                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1648                         return -1;
1649                 }
1650         }
1651
1652
1653         return 0;
1654 }
1655
1656 /*
1657   this function will unban all nodes in the cluster
1658 */
1659 static void unban_all_nodes(struct ctdb_context *ctdb)
1660 {
1661         int ret, i;
1662         struct ctdb_node_map *nodemap;
1663         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1664         
1665         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1666         if (ret != 0) {
1667                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1668                 return;
1669         }
1670
1671         for (i=0;i<nodemap->num;i++) {
1672                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1673                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1674                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1675                 }
1676         }
1677
1678         talloc_free(tmp_ctx);
1679 }
1680
1681
1682 /*
1683   we think we are winning the election - send a broadcast election request
1684  */
1685 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1686 {
1687         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1688         int ret;
1689
1690         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1691         if (ret != 0) {
1692                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1693         }
1694
1695         talloc_free(rec->send_election_te);
1696         rec->send_election_te = NULL;
1697 }
1698
1699 /*
1700   handler for memory dumps
1701 */
1702 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1703                              TDB_DATA data, void *private_data)
1704 {
1705         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1706         TDB_DATA *dump;
1707         int ret;
1708         struct rd_memdump_reply *rd;
1709
1710         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1711                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1712                 talloc_free(tmp_ctx);
1713                 return;
1714         }
1715         rd = (struct rd_memdump_reply *)data.dptr;
1716
1717         dump = talloc_zero(tmp_ctx, TDB_DATA);
1718         if (dump == NULL) {
1719                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1720                 talloc_free(tmp_ctx);
1721                 return;
1722         }
1723         ret = ctdb_dump_memory(ctdb, dump);
1724         if (ret != 0) {
1725                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1726                 talloc_free(tmp_ctx);
1727                 return;
1728         }
1729
1730 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1731
1732         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1733         if (ret != 0) {
1734                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1735                 talloc_free(tmp_ctx);
1736                 return;
1737         }
1738
1739         talloc_free(tmp_ctx);
1740 }
1741
1742 /*
1743   handler for reload_nodes
1744 */
1745 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1746                              TDB_DATA data, void *private_data)
1747 {
1748         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1749
1750         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1751
1752         reload_nodes_file(rec->ctdb);
1753 }
1754
1755
1756 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1757                               struct timeval yt, void *p)
1758 {
1759         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1760
1761         talloc_free(rec->ip_check_disable_ctx);
1762         rec->ip_check_disable_ctx = NULL;
1763 }
1764
1765 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1766                              TDB_DATA data, void *private_data)
1767 {
1768         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1769         uint32_t timeout;
1770
1771         if (rec->ip_check_disable_ctx != NULL) {
1772                 talloc_free(rec->ip_check_disable_ctx);
1773                 rec->ip_check_disable_ctx = NULL;
1774         }
1775
1776         if (data.dsize != sizeof(uint32_t)) {
1777                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu expexting %lu\n", data.dsize, sizeof(uint32_t)));
1778                 return;
1779         }
1780         if (data.dptr == NULL) {
1781                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1782                 return;
1783         }
1784
1785         timeout = *((uint32_t *)data.dptr);
1786         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1787
1788         rec->ip_check_disable_ctx = talloc_new(rec);
1789         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1790
1791         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1792 }
1793
1794
1795 /*
1796   handler for ip reallocate, just add it to the list of callers and 
1797   handle this later in the monitor_cluster loop so we do not recurse
1798   with other callers to takeover_run()
1799 */
1800 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1801                              TDB_DATA data, void *private_data)
1802 {
1803         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1804         struct ip_reallocate_list *caller;
1805
1806         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1807                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1808                 return;
1809         }
1810
1811         if (rec->ip_reallocate_ctx == NULL) {
1812                 rec->ip_reallocate_ctx = talloc_new(rec);
1813                 CTDB_NO_MEMORY_FATAL(ctdb, caller);
1814         }
1815
1816         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1817         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1818
1819         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1820         caller->next = rec->reallocate_callers;
1821         rec->reallocate_callers = caller;
1822
1823         return;
1824 }
1825
1826 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1827 {
1828         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1829         TDB_DATA result;
1830         int32_t ret;
1831         struct ip_reallocate_list *callers;
1832
1833         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1834         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1835         result.dsize = sizeof(int32_t);
1836         result.dptr  = (uint8_t *)&ret;
1837
1838         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1839                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1840                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1841                 if (ret != 0) {
1842                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply message to %u:%lu\n", callers->rd->pnn, callers->rd->srvid));
1843                 }
1844         }
1845
1846         talloc_free(tmp_ctx);
1847         talloc_free(rec->ip_reallocate_ctx);
1848         rec->ip_reallocate_ctx = NULL;
1849         rec->reallocate_callers = NULL;
1850         
1851 }
1852
1853
1854 /*
1855   handler for recovery master elections
1856 */
1857 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1858                              TDB_DATA data, void *private_data)
1859 {
1860         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1861         int ret;
1862         struct election_message *em = (struct election_message *)data.dptr;
1863         TALLOC_CTX *mem_ctx;
1864
1865         /* we got an election packet - update the timeout for the election */
1866         talloc_free(rec->election_timeout);
1867         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1868                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1869                                                 ctdb_election_timeout, rec);
1870
1871         mem_ctx = talloc_new(ctdb);
1872
1873         /* someone called an election. check their election data
1874            and if we disagree and we would rather be the elected node, 
1875            send a new election message to all other nodes
1876          */
1877         if (ctdb_election_win(rec, em)) {
1878                 if (!rec->send_election_te) {
1879                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1880                                                                 timeval_current_ofs(0, 500000),
1881                                                                 election_send_request, rec);
1882                 }
1883                 talloc_free(mem_ctx);
1884                 /*unban_all_nodes(ctdb);*/
1885                 return;
1886         }
1887         
1888         /* we didn't win */
1889         talloc_free(rec->send_election_te);
1890         rec->send_election_te = NULL;
1891
1892         if (ctdb->tunable.verify_recovery_lock != 0) {
1893                 /* release the recmaster lock */
1894                 if (em->pnn != ctdb->pnn &&
1895                     ctdb->recovery_lock_fd != -1) {
1896                         close(ctdb->recovery_lock_fd);
1897                         ctdb->recovery_lock_fd = -1;
1898                         unban_all_nodes(ctdb);
1899                 }
1900         }
1901
1902         /* ok, let that guy become recmaster then */
1903         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1904         if (ret != 0) {
1905                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1906                 talloc_free(mem_ctx);
1907                 return;
1908         }
1909
1910         talloc_free(mem_ctx);
1911         return;
1912 }
1913
1914
1915 /*
1916   force the start of the election process
1917  */
1918 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1919                            struct ctdb_node_map *nodemap)
1920 {
1921         int ret;
1922         struct ctdb_context *ctdb = rec->ctdb;
1923
1924         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1925
1926         /* set all nodes to recovery mode to stop all internode traffic */
1927         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1928         if (ret != 0) {
1929                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1930                 return;
1931         }
1932
1933         talloc_free(rec->election_timeout);
1934         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1935                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1936                                                 ctdb_election_timeout, rec);
1937
1938         ret = send_election_request(rec, pnn, true);
1939         if (ret!=0) {
1940                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
1941                 return;
1942         }
1943
1944         /* wait for a few seconds to collect all responses */
1945         ctdb_wait_election(rec);
1946 }
1947
1948
1949
1950 /*
1951   handler for when a node changes its flags
1952 */
1953 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1954                             TDB_DATA data, void *private_data)
1955 {
1956         int ret;
1957         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
1958         struct ctdb_node_map *nodemap=NULL;
1959         TALLOC_CTX *tmp_ctx;
1960         uint32_t changed_flags;
1961         int i;
1962         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1963         int disabled_flag_changed;
1964
1965         if (data.dsize != sizeof(*c)) {
1966                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
1967                 return;
1968         }
1969
1970         tmp_ctx = talloc_new(ctdb);
1971         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
1972
1973         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1974         if (ret != 0) {
1975                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
1976                 talloc_free(tmp_ctx);
1977                 return;         
1978         }
1979
1980
1981         for (i=0;i<nodemap->num;i++) {
1982                 if (nodemap->nodes[i].pnn == c->pnn) break;
1983         }
1984
1985         if (i == nodemap->num) {
1986                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
1987                 talloc_free(tmp_ctx);
1988                 return;
1989         }
1990
1991         changed_flags = c->old_flags ^ c->new_flags;
1992
1993         if (nodemap->nodes[i].flags != c->new_flags) {
1994                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
1995         }
1996
1997         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
1998
1999         nodemap->nodes[i].flags = c->new_flags;
2000
2001         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2002                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2003
2004         if (ret == 0) {
2005                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2006                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2007         }
2008         
2009         if (ret == 0 &&
2010             ctdb->recovery_master == ctdb->pnn &&
2011             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2012                 /* Only do the takeover run if the perm disabled or unhealthy
2013                    flags changed since these will cause an ip failover but not
2014                    a recovery.
2015                    If the node became disconnected or banned this will also
2016                    lead to an ip address failover but that is handled 
2017                    during recovery
2018                 */
2019                 if (disabled_flag_changed) {
2020                         rec->need_takeover_run = true;
2021                 }
2022         }
2023
2024         talloc_free(tmp_ctx);
2025 }
2026
2027 /*
2028   handler for when we need to push out flag changes ot all other nodes
2029 */
2030 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2031                             TDB_DATA data, void *private_data)
2032 {
2033         int ret;
2034         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2035
2036         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), c->pnn, c->new_flags, ~c->new_flags);
2037         if (ret != 0) {
2038                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
2039         }
2040 }
2041
2042
2043 struct verify_recmode_normal_data {
2044         uint32_t count;
2045         enum monitor_result status;
2046 };
2047
2048 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2049 {
2050         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2051
2052
2053         /* one more node has responded with recmode data*/
2054         rmdata->count--;
2055
2056         /* if we failed to get the recmode, then return an error and let
2057            the main loop try again.
2058         */
2059         if (state->state != CTDB_CONTROL_DONE) {
2060                 if (rmdata->status == MONITOR_OK) {
2061                         rmdata->status = MONITOR_FAILED;
2062                 }
2063                 return;
2064         }
2065
2066         /* if we got a response, then the recmode will be stored in the
2067            status field
2068         */
2069         if (state->status != CTDB_RECOVERY_NORMAL) {
2070                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2071                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2072         }
2073
2074         return;
2075 }
2076
2077
2078 /* verify that all nodes are in normal recovery mode */
2079 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2080 {
2081         struct verify_recmode_normal_data *rmdata;
2082         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2083         struct ctdb_client_control_state *state;
2084         enum monitor_result status;
2085         int j;
2086         
2087         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2088         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2089         rmdata->count  = 0;
2090         rmdata->status = MONITOR_OK;
2091
2092         /* loop over all active nodes and send an async getrecmode call to 
2093            them*/
2094         for (j=0; j<nodemap->num; j++) {
2095                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2096                         continue;
2097                 }
2098                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2099                                         CONTROL_TIMEOUT(), 
2100                                         nodemap->nodes[j].pnn);
2101                 if (state == NULL) {
2102                         /* we failed to send the control, treat this as 
2103                            an error and try again next iteration
2104                         */                      
2105                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2106                         talloc_free(mem_ctx);
2107                         return MONITOR_FAILED;
2108                 }
2109
2110                 /* set up the callback functions */
2111                 state->async.fn = verify_recmode_normal_callback;
2112                 state->async.private_data = rmdata;
2113
2114                 /* one more control to wait for to complete */
2115                 rmdata->count++;
2116         }
2117
2118
2119         /* now wait for up to the maximum number of seconds allowed
2120            or until all nodes we expect a response from has replied
2121         */
2122         while (rmdata->count > 0) {
2123                 event_loop_once(ctdb->ev);
2124         }
2125
2126         status = rmdata->status;
2127         talloc_free(mem_ctx);
2128         return status;
2129 }
2130
2131
2132 struct verify_recmaster_data {
2133         struct ctdb_recoverd *rec;
2134         uint32_t count;
2135         uint32_t pnn;
2136         enum monitor_result status;
2137 };
2138
2139 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2140 {
2141         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2142
2143
2144         /* one more node has responded with recmaster data*/
2145         rmdata->count--;
2146
2147         /* if we failed to get the recmaster, then return an error and let
2148            the main loop try again.
2149         */
2150         if (state->state != CTDB_CONTROL_DONE) {
2151                 if (rmdata->status == MONITOR_OK) {
2152                         rmdata->status = MONITOR_FAILED;
2153                 }
2154                 return;
2155         }
2156
2157         /* if we got a response, then the recmaster will be stored in the
2158            status field
2159         */
2160         if (state->status != rmdata->pnn) {
2161                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2162                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2163                 rmdata->status = MONITOR_ELECTION_NEEDED;
2164         }
2165
2166         return;
2167 }
2168
2169
2170 /* verify that all nodes agree that we are the recmaster */
2171 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2172 {
2173         struct ctdb_context *ctdb = rec->ctdb;
2174         struct verify_recmaster_data *rmdata;
2175         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2176         struct ctdb_client_control_state *state;
2177         enum monitor_result status;
2178         int j;
2179         
2180         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2181         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2182         rmdata->rec    = rec;
2183         rmdata->count  = 0;
2184         rmdata->pnn    = pnn;
2185         rmdata->status = MONITOR_OK;
2186
2187         /* loop over all active nodes and send an async getrecmaster call to 
2188            them*/
2189         for (j=0; j<nodemap->num; j++) {
2190                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2191                         continue;
2192                 }
2193                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2194                                         CONTROL_TIMEOUT(),
2195                                         nodemap->nodes[j].pnn);
2196                 if (state == NULL) {
2197                         /* we failed to send the control, treat this as 
2198                            an error and try again next iteration
2199                         */                      
2200                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2201                         talloc_free(mem_ctx);
2202                         return MONITOR_FAILED;
2203                 }
2204
2205                 /* set up the callback functions */
2206                 state->async.fn = verify_recmaster_callback;
2207                 state->async.private_data = rmdata;
2208
2209                 /* one more control to wait for to complete */
2210                 rmdata->count++;
2211         }
2212
2213
2214         /* now wait for up to the maximum number of seconds allowed
2215            or until all nodes we expect a response from has replied
2216         */
2217         while (rmdata->count > 0) {
2218                 event_loop_once(ctdb->ev);
2219         }
2220
2221         status = rmdata->status;
2222         talloc_free(mem_ctx);
2223         return status;
2224 }
2225
2226
2227 /* called to check that the allocation of public ip addresses is ok.
2228 */
2229 static int verify_ip_allocation(struct ctdb_context *ctdb, uint32_t pnn)
2230 {
2231         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2232         struct ctdb_all_public_ips *ips = NULL;
2233         struct ctdb_uptime *uptime1 = NULL;
2234         struct ctdb_uptime *uptime2 = NULL;
2235         int ret, j;
2236
2237         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2238                                 CTDB_CURRENT_NODE, &uptime1);
2239         if (ret != 0) {
2240                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2241                 talloc_free(mem_ctx);
2242                 return -1;
2243         }
2244
2245         /* read the ip allocation from the local node */
2246         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2247         if (ret != 0) {
2248                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2249                 talloc_free(mem_ctx);
2250                 return -1;
2251         }
2252
2253         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2254                                 CTDB_CURRENT_NODE, &uptime2);
2255         if (ret != 0) {
2256                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2257                 talloc_free(mem_ctx);
2258                 return -1;
2259         }
2260
2261         /* skip the check if the startrecovery time has changed */
2262         if (timeval_compare(&uptime1->last_recovery_started,
2263                             &uptime2->last_recovery_started) != 0) {
2264                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2265                 talloc_free(mem_ctx);
2266                 return 0;
2267         }
2268
2269         /* skip the check if the endrecovery time has changed */
2270         if (timeval_compare(&uptime1->last_recovery_finished,
2271                             &uptime2->last_recovery_finished) != 0) {
2272                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2273                 talloc_free(mem_ctx);
2274                 return 0;
2275         }
2276
2277         /* skip the check if we have started but not finished recovery */
2278         if (timeval_compare(&uptime1->last_recovery_finished,
2279                             &uptime1->last_recovery_started) != 1) {
2280                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery. skipping public ip address check\n"));
2281                 talloc_free(mem_ctx);
2282
2283                 return 0;
2284         }
2285
2286         /* verify that we have the ip addresses we should have
2287            and we dont have ones we shouldnt have.
2288            if we find an inconsistency we set recmode to
2289            active on the local node and wait for the recmaster
2290            to do a full blown recovery
2291         */
2292         for (j=0; j<ips->num; j++) {
2293                 if (ips->ips[j].pnn == pnn) {
2294                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2295                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2296                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2297                                 ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2298                                 if (ret != 0) {
2299                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2300
2301                                         talloc_free(mem_ctx);
2302                                         return -1;
2303                                 }
2304                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2305                                 if (ret != 0) {
2306                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2307
2308                                         talloc_free(mem_ctx);
2309                                         return -1;
2310                                 }
2311                         }
2312                 } else {
2313                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2314                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2315                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2316
2317                                 ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2318                                 if (ret != 0) {
2319                                         DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to public ip address mismatches\n"));
2320
2321                                         talloc_free(mem_ctx);
2322                                         return -1;
2323                                 }
2324                                 ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2325                                 if (ret != 0) {
2326                                         DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to public ip address mismatches\n"));
2327
2328                                         talloc_free(mem_ctx);
2329                                         return -1;
2330                                 }
2331                         }
2332                 }
2333         }
2334
2335         talloc_free(mem_ctx);
2336         return 0;
2337 }
2338
2339
2340 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2341 {
2342         struct ctdb_node_map **remote_nodemaps = callback_data;
2343
2344         if (node_pnn >= ctdb->num_nodes) {
2345                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2346                 return;
2347         }
2348
2349         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2350
2351 }
2352
2353 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2354         struct ctdb_node_map *nodemap,
2355         struct ctdb_node_map **remote_nodemaps)
2356 {
2357         uint32_t *nodes;
2358
2359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2360         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2361                                         nodes, 0,
2362                                         CONTROL_TIMEOUT(), false, tdb_null,
2363                                         async_getnodemap_callback,
2364                                         NULL,
2365                                         remote_nodemaps) != 0) {
2366                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2367
2368                 return -1;
2369         }
2370
2371         return 0;
2372 }
2373
2374 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2375 struct ctdb_check_reclock_state {
2376         struct ctdb_context *ctdb;
2377         struct timeval start_time;
2378         int fd[2];
2379         pid_t child;
2380         struct timed_event *te;
2381         struct fd_event *fde;
2382         enum reclock_child_status status;
2383 };
2384
2385 /* when we free the reclock state we must kill any child process.
2386 */
2387 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2388 {
2389         struct ctdb_context *ctdb = state->ctdb;
2390
2391         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2392
2393         if (state->fd[0] != -1) {
2394                 close(state->fd[0]);
2395                 state->fd[0] = -1;
2396         }
2397         if (state->fd[1] != -1) {
2398                 close(state->fd[1]);
2399                 state->fd[1] = -1;
2400         }
2401         kill(state->child, SIGKILL);
2402         return 0;
2403 }
2404
2405 /*
2406   called if our check_reclock child times out. this would happen if
2407   i/o to the reclock file blocks.
2408  */
2409 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2410                                          struct timeval t, void *private_data)
2411 {
2412         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2413                                            struct ctdb_check_reclock_state);
2414
2415         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2416         state->status = RECLOCK_TIMEOUT;
2417 }
2418
2419 /* this is called when the child process has completed checking the reclock
2420    file and has written data back to us through the pipe.
2421 */
2422 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2423                              uint16_t flags, void *private_data)
2424 {
2425         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2426                                              struct ctdb_check_reclock_state);
2427         char c = 0;
2428         int ret;
2429
2430         /* we got a response from our child process so we can abort the
2431            timeout.
2432         */
2433         talloc_free(state->te);
2434         state->te = NULL;
2435
2436         ret = read(state->fd[0], &c, 1);
2437         if (ret != 1 || c != RECLOCK_OK) {
2438                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2439                 state->status = RECLOCK_FAILED;
2440
2441                 return;
2442         }
2443
2444         state->status = RECLOCK_OK;
2445         return;
2446 }
2447
2448 static int check_recovery_lock(struct ctdb_context *ctdb)
2449 {
2450         int ret;
2451         struct ctdb_check_reclock_state *state;
2452         pid_t parent = getpid();
2453
2454         if (ctdb->recovery_lock_fd == -1) {
2455                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2456                 return -1;
2457         }
2458
2459         state = talloc(ctdb, struct ctdb_check_reclock_state);
2460         CTDB_NO_MEMORY(ctdb, state);
2461
2462         state->ctdb = ctdb;
2463         state->start_time = timeval_current();
2464         state->status = RECLOCK_CHECKING;
2465         state->fd[0] = -1;
2466         state->fd[1] = -1;
2467
2468         ret = pipe(state->fd);
2469         if (ret != 0) {
2470                 talloc_free(state);
2471                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2472                 return -1;
2473         }
2474
2475         state->child = fork();
2476         if (state->child == (pid_t)-1) {
2477                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2478                 close(state->fd[0]);
2479                 state->fd[0] = -1;
2480                 close(state->fd[1]);
2481                 state->fd[1] = -1;
2482                 talloc_free(state);
2483                 return -1;
2484         }
2485
2486         if (state->child == 0) {
2487                 char cc = RECLOCK_OK;
2488                 close(state->fd[0]);
2489                 state->fd[0] = -1;
2490
2491                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2492                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2493                         cc = RECLOCK_FAILED;
2494                 }
2495
2496                 write(state->fd[1], &cc, 1);
2497                 /* make sure we die when our parent dies */
2498                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2499                         sleep(5);
2500                         write(state->fd[1], &cc, 1);
2501                 }
2502                 _exit(0);
2503         }
2504         close(state->fd[1]);
2505         state->fd[1] = -1;
2506
2507         talloc_set_destructor(state, check_reclock_destructor);
2508
2509         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2510                                     ctdb_check_reclock_timeout, state);
2511         if (state->te == NULL) {
2512                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2513                 talloc_free(state);
2514                 return -1;
2515         }
2516
2517         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2518                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2519                                 reclock_child_handler,
2520                                 (void *)state);
2521
2522         if (state->fde == NULL) {
2523                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2524                 talloc_free(state);
2525                 return -1;
2526         }
2527
2528         while (state->status == RECLOCK_CHECKING) {
2529                 event_loop_once(ctdb->ev);
2530         }
2531
2532         if (state->status == RECLOCK_FAILED) {
2533                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2534                 close(ctdb->recovery_lock_fd);
2535                 ctdb->recovery_lock_fd = -1;
2536                 talloc_free(state);
2537                 return -1;
2538         }
2539
2540         talloc_free(state);
2541         return 0;
2542 }
2543
2544 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2545 {
2546         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2547         const char *reclockfile;
2548
2549         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2550                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2551                 talloc_free(tmp_ctx);
2552                 return -1;      
2553         }
2554
2555         if (reclockfile == NULL) {
2556                 if (ctdb->recovery_lock_file != NULL) {
2557                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2558                         talloc_free(ctdb->recovery_lock_file);
2559                         ctdb->recovery_lock_file = NULL;
2560                         if (ctdb->recovery_lock_fd != -1) {
2561                                 close(ctdb->recovery_lock_fd);
2562                                 ctdb->recovery_lock_fd = -1;
2563                         }
2564                 }
2565                 ctdb->tunable.verify_recovery_lock = 0;
2566                 talloc_free(tmp_ctx);
2567                 return 0;
2568         }
2569
2570         if (ctdb->recovery_lock_file == NULL) {
2571                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2572                 if (ctdb->recovery_lock_fd != -1) {
2573                         close(ctdb->recovery_lock_fd);
2574                         ctdb->recovery_lock_fd = -1;
2575                 }
2576                 talloc_free(tmp_ctx);
2577                 return 0;
2578         }
2579
2580
2581         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2582                 talloc_free(tmp_ctx);
2583                 return 0;
2584         }
2585
2586         talloc_free(ctdb->recovery_lock_file);
2587         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2588         ctdb->tunable.verify_recovery_lock = 0;
2589         if (ctdb->recovery_lock_fd != -1) {
2590                 close(ctdb->recovery_lock_fd);
2591                 ctdb->recovery_lock_fd = -1;
2592         }
2593
2594         talloc_free(tmp_ctx);
2595         return 0;
2596 }
2597                 
2598 /*
2599   the main monitoring loop
2600  */
2601 static void monitor_cluster(struct ctdb_context *ctdb)
2602 {
2603         uint32_t pnn;
2604         TALLOC_CTX *mem_ctx=NULL;
2605         struct ctdb_node_map *nodemap=NULL;
2606         struct ctdb_node_map *recmaster_nodemap=NULL;
2607         struct ctdb_node_map **remote_nodemaps=NULL;
2608         struct ctdb_vnn_map *vnnmap=NULL;
2609         struct ctdb_vnn_map *remote_vnnmap=NULL;
2610         int32_t debug_level;
2611         int i, j, ret;
2612         struct ctdb_recoverd *rec;
2613
2614         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2615
2616         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2617         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2618
2619         rec->ctdb = ctdb;
2620
2621         rec->priority_time = timeval_current();
2622
2623         /* register a message port for sending memory dumps */
2624         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2625
2626         /* register a message port for recovery elections */
2627         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2628
2629         /* when nodes are disabled/enabled */
2630         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2631
2632         /* when we are asked to puch out a flag change */
2633         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2634
2635         /* register a message port for vacuum fetch */
2636         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2637
2638         /* register a message port for reloadnodes  */
2639         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2640
2641         /* register a message port for performing a takeover run */
2642         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2643
2644         /* register a message port for disabling the ip check for a short while */
2645         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2646
2647 again:
2648         if (mem_ctx) {
2649                 talloc_free(mem_ctx);
2650                 mem_ctx = NULL;
2651         }
2652         mem_ctx = talloc_new(ctdb);
2653         if (!mem_ctx) {
2654                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2655                 exit(-1);
2656         }
2657
2658         /* we only check for recovery once every second */
2659         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2660
2661         /* verify that the main daemon is still running */
2662         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2663                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2664                 exit(-1);
2665         }
2666
2667         /* ping the local daemon to tell it we are alive */
2668         ctdb_ctrl_recd_ping(ctdb);
2669
2670         if (rec->election_timeout) {
2671                 /* an election is in progress */
2672                 goto again;
2673         }
2674
2675         /* read the debug level from the parent and update locally */
2676         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2677         if (ret !=0) {
2678                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2679                 goto again;
2680         }
2681         LogLevel = debug_level;
2682
2683
2684         /* We must check if we need to ban a node here but we want to do this
2685            as early as possible so we dont wait until we have pulled the node
2686            map from the local node. thats why we have the hardcoded value 20
2687         */
2688         for (i=0; i<ctdb->num_nodes; i++) {
2689                 struct ctdb_banning_state *ban_state;
2690
2691                 if (ctdb->nodes[i]->ban_state == NULL) {
2692                         continue;
2693                 }
2694                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2695                 if (ban_state->count < 20) {
2696                         continue;
2697                 }
2698                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2699                         ctdb->nodes[i]->pnn, ban_state->count,
2700                         ctdb->tunable.recovery_ban_period));
2701                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2702                 ban_state->count = 0;
2703         }
2704
2705         /* get relevant tunables */
2706         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2707         if (ret != 0) {
2708                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2709                 goto again;
2710         }
2711
2712         /* get the current recovery lock file from the server */
2713         if (update_recovery_lock_file(ctdb) != 0) {
2714                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2715                 goto again;
2716         }
2717
2718         /* Make sure that if recovery lock verification becomes disabled when
2719            we close the file
2720         */
2721         if (ctdb->tunable.verify_recovery_lock == 0) {
2722                 if (ctdb->recovery_lock_fd != -1) {
2723                         close(ctdb->recovery_lock_fd);
2724                         ctdb->recovery_lock_fd = -1;
2725                 }
2726         }
2727
2728         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2729         if (pnn == (uint32_t)-1) {
2730                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2731                 goto again;
2732         }
2733
2734         /* get the vnnmap */
2735         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2736         if (ret != 0) {
2737                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2738                 goto again;
2739         }
2740
2741
2742         /* get number of nodes */
2743         if (rec->nodemap) {
2744                 talloc_free(rec->nodemap);
2745                 rec->nodemap = NULL;
2746                 nodemap=NULL;
2747         }
2748         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2749         if (ret != 0) {
2750                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2751                 goto again;
2752         }
2753         nodemap = rec->nodemap;
2754
2755         /* check which node is the recovery master */
2756         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2757         if (ret != 0) {
2758                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2759                 goto again;
2760         }
2761
2762         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2763         if (rec->recmaster != pnn) {
2764                 if (rec->ip_reallocate_ctx != NULL) {
2765                         talloc_free(rec->ip_reallocate_ctx);
2766                         rec->ip_reallocate_ctx = NULL;
2767                         rec->reallocate_callers = NULL;
2768                 }
2769         }
2770         /* if there are takeovers requested, perform it and notify the waiters */
2771         if (rec->reallocate_callers) {
2772                 process_ipreallocate_requests(ctdb, rec);
2773         }
2774
2775         if (rec->recmaster == (uint32_t)-1) {
2776                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2777                 force_election(rec, pnn, nodemap);
2778                 goto again;
2779         }
2780
2781
2782         /* if the local daemon is STOPPED, we verify that the databases are
2783            also frozen and thet the recmode is set to active 
2784         */
2785         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2786                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2787                 if (ret != 0) {
2788                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2789                 }
2790                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2791                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2792
2793                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2794                         if (ret != 0) {
2795                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2796                                 goto again;
2797                         }
2798                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2799                         if (ret != 0) {
2800                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2801
2802                                 goto again;
2803                         }
2804                         goto again;
2805                 }
2806         }
2807         /* If the local node is stopped, verify we are not the recmaster 
2808            and yield this role if so
2809         */
2810         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2811                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2812                 force_election(rec, pnn, nodemap);
2813                 goto again;
2814         }
2815         
2816         /* check that we (recovery daemon) and the local ctdb daemon
2817            agrees on whether we are banned or not
2818         */
2819 //qqq
2820
2821         /* remember our own node flags */
2822         rec->node_flags = nodemap->nodes[pnn].flags;
2823
2824         /* count how many active nodes there are */
2825         rec->num_active    = 0;
2826         rec->num_connected = 0;
2827         for (i=0; i<nodemap->num; i++) {
2828                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2829                         rec->num_active++;
2830                 }
2831                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2832                         rec->num_connected++;
2833                 }
2834         }
2835
2836
2837         /* verify that the recmaster node is still active */
2838         for (j=0; j<nodemap->num; j++) {
2839                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2840                         break;
2841                 }
2842         }
2843
2844         if (j == nodemap->num) {
2845                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2846                 force_election(rec, pnn, nodemap);
2847                 goto again;
2848         }
2849
2850         /* if recovery master is disconnected we must elect a new recmaster */
2851         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2852                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2853                 force_election(rec, pnn, nodemap);
2854                 goto again;
2855         }
2856
2857         /* grap the nodemap from the recovery master to check if it is banned */
2858         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2859                                    mem_ctx, &recmaster_nodemap);
2860         if (ret != 0) {
2861                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2862                           nodemap->nodes[j].pnn));
2863                 goto again;
2864         }
2865
2866
2867         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2868                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2869                 force_election(rec, pnn, nodemap);
2870                 goto again;
2871         }
2872
2873
2874         /* verify that we have all ip addresses we should have and we dont
2875          * have addresses we shouldnt have.
2876          */ 
2877         if (ctdb->do_checkpublicip) {
2878                 if (rec->ip_check_disable_ctx == NULL) {
2879                         if (verify_ip_allocation(ctdb, pnn) != 0) {
2880                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
2881                                 goto again;
2882                         }
2883                 }
2884         }
2885
2886
2887         /* if we are not the recmaster then we do not need to check
2888            if recovery is needed
2889          */
2890         if (pnn != rec->recmaster) {
2891                 goto again;
2892         }
2893
2894
2895         /* ensure our local copies of flags are right */
2896         ret = update_local_flags(rec, nodemap);
2897         if (ret == MONITOR_ELECTION_NEEDED) {
2898                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
2899                 force_election(rec, pnn, nodemap);
2900                 goto again;
2901         }
2902         if (ret != MONITOR_OK) {
2903                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
2904                 goto again;
2905         }
2906
2907         /* update the list of public ips that a node can handle for
2908            all connected nodes
2909         */
2910         if (ctdb->num_nodes != nodemap->num) {
2911                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
2912                 reload_nodes_file(ctdb);
2913                 goto again;
2914         }
2915         for (j=0; j<nodemap->num; j++) {
2916                 /* release any existing data */
2917                 if (ctdb->nodes[j]->public_ips) {
2918                         talloc_free(ctdb->nodes[j]->public_ips);
2919                         ctdb->nodes[j]->public_ips = NULL;
2920                 }
2921
2922                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2923                         continue;
2924                 }
2925
2926                 /* grab a new shiny list of public ips from the node */
2927                 if (ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(),
2928                         ctdb->nodes[j]->pnn, 
2929                         ctdb->nodes,
2930                         &ctdb->nodes[j]->public_ips)) {
2931                         DEBUG(DEBUG_ERR,("Failed to read public ips from node : %u\n", 
2932                                 ctdb->nodes[j]->pnn));
2933                         goto again;
2934                 }
2935         }
2936
2937
2938         /* verify that all active nodes agree that we are the recmaster */
2939         switch (verify_recmaster(rec, nodemap, pnn)) {
2940         case MONITOR_RECOVERY_NEEDED:
2941                 /* can not happen */
2942                 goto again;
2943         case MONITOR_ELECTION_NEEDED:
2944                 force_election(rec, pnn, nodemap);
2945                 goto again;
2946         case MONITOR_OK:
2947                 break;
2948         case MONITOR_FAILED:
2949                 goto again;
2950         }
2951
2952
2953         if (rec->need_recovery) {
2954                 /* a previous recovery didn't finish */
2955                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2956                 goto again;             
2957         }
2958
2959         /* verify that all active nodes are in normal mode 
2960            and not in recovery mode 
2961         */
2962         switch (verify_recmode(ctdb, nodemap)) {
2963         case MONITOR_RECOVERY_NEEDED:
2964                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2965                 goto again;
2966         case MONITOR_FAILED:
2967                 goto again;
2968         case MONITOR_ELECTION_NEEDED:
2969                 /* can not happen */
2970         case MONITOR_OK:
2971                 break;
2972         }
2973
2974
2975         if (ctdb->tunable.verify_recovery_lock != 0) {
2976                 /* we should have the reclock - check its not stale */
2977                 ret = check_recovery_lock(ctdb);
2978                 if (ret != 0) {
2979                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
2980                         ctdb_set_culprit(rec, ctdb->pnn);
2981                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
2982                         goto again;
2983                 }
2984         }
2985
2986         /* get the nodemap for all active remote nodes
2987          */
2988         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
2989         if (remote_nodemaps == NULL) {
2990                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
2991                 goto again;
2992         }
2993         for(i=0; i<nodemap->num; i++) {
2994                 remote_nodemaps[i] = NULL;
2995         }
2996         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
2997                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
2998                 goto again;
2999         } 
3000
3001         /* verify that all other nodes have the same nodemap as we have
3002         */
3003         for (j=0; j<nodemap->num; j++) {
3004                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3005                         continue;
3006                 }
3007
3008                 if (remote_nodemaps[j] == NULL) {
3009                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3010                         ctdb_set_culprit(rec, j);
3011
3012                         goto again;
3013                 }
3014
3015                 /* if the nodes disagree on how many nodes there are
3016                    then this is a good reason to try recovery
3017                  */
3018                 if (remote_nodemaps[j]->num != nodemap->num) {
3019                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3020                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3021                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3022                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3023                         goto again;
3024                 }
3025
3026                 /* if the nodes disagree on which nodes exist and are
3027                    active, then that is also a good reason to do recovery
3028                  */
3029                 for (i=0;i<nodemap->num;i++) {
3030                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3031                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3032                                           nodemap->nodes[j].pnn, i, 
3033                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3034                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3035                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3036                                             vnnmap);
3037                                 goto again;
3038                         }
3039                 }
3040
3041                 /* verify the flags are consistent
3042                 */
3043                 for (i=0; i<nodemap->num; i++) {
3044                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3045                                 continue;
3046                         }
3047                         
3048                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3049                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3050                                   nodemap->nodes[j].pnn, 
3051                                   nodemap->nodes[i].pnn, 
3052                                   remote_nodemaps[j]->nodes[i].flags,
3053                                   nodemap->nodes[j].flags));
3054                                 if (i == j) {
3055                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3056                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3057                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3058                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3059                                                     vnnmap);
3060                                         goto again;
3061                                 } else {
3062                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3063                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3064                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3065                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3066                                                     vnnmap);
3067                                         goto again;
3068                                 }
3069                         }
3070                 }
3071         }
3072
3073
3074         /* there better be the same number of lmasters in the vnn map
3075            as there are active nodes or we will have to do a recovery
3076          */
3077         if (vnnmap->size != rec->num_active) {
3078                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3079                           vnnmap->size, rec->num_active));
3080                 ctdb_set_culprit(rec, ctdb->pnn);
3081                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3082                 goto again;
3083         }
3084
3085         /* verify that all active nodes in the nodemap also exist in 
3086            the vnnmap.
3087          */
3088         for (j=0; j<nodemap->num; j++) {
3089                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3090                         continue;
3091                 }
3092                 if (nodemap->nodes[j].pnn == pnn) {
3093                         continue;
3094                 }
3095
3096                 for (i=0; i<vnnmap->size; i++) {
3097                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3098                                 break;
3099                         }
3100                 }
3101                 if (i == vnnmap->size) {
3102                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3103                                   nodemap->nodes[j].pnn));
3104                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3105                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3106                         goto again;
3107                 }
3108         }
3109
3110         
3111         /* verify that all other nodes have the same vnnmap
3112            and are from the same generation
3113          */
3114         for (j=0; j<nodemap->num; j++) {
3115                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3116                         continue;
3117                 }
3118                 if (nodemap->nodes[j].pnn == pnn) {
3119                         continue;
3120                 }
3121
3122                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3123                                           mem_ctx, &remote_vnnmap);
3124                 if (ret != 0) {
3125                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3126                                   nodemap->nodes[j].pnn));
3127                         goto again;
3128                 }
3129
3130                 /* verify the vnnmap generation is the same */
3131                 if (vnnmap->generation != remote_vnnmap->generation) {
3132                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3133                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3134                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3135                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3136                         goto again;
3137                 }
3138
3139                 /* verify the vnnmap size is the same */
3140                 if (vnnmap->size != remote_vnnmap->size) {
3141                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3142                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3143                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3144                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3145                         goto again;
3146                 }
3147
3148                 /* verify the vnnmap is the same */
3149                 for (i=0;i<vnnmap->size;i++) {
3150                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3151                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3152                                           nodemap->nodes[j].pnn));
3153                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3154                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3155                                             vnnmap);
3156                                 goto again;
3157                         }
3158                 }
3159         }
3160
3161         /* we might need to change who has what IP assigned */
3162         if (rec->need_takeover_run) {
3163                 rec->need_takeover_run = false;
3164
3165                 /* execute the "startrecovery" event script on all nodes */
3166                 ret = run_startrecovery_eventscript(rec, nodemap);
3167                 if (ret!=0) {
3168                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3169                         ctdb_set_culprit(rec, ctdb->pnn);
3170                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3171                 }
3172
3173                 ret = ctdb_takeover_run(ctdb, nodemap);
3174                 if (ret != 0) {
3175                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3176                         ctdb_set_culprit(rec, ctdb->pnn);
3177                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3178                 }
3179
3180                 /* execute the "recovered" event script on all nodes */
3181                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3182 #if 0
3183 // we cant check whether the event completed successfully
3184 // since this script WILL fail if the node is in recovery mode
3185 // and if that race happens, the code here would just cause a second
3186 // cascading recovery.
3187                 if (ret!=0) {
3188                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3189                         ctdb_set_culprit(rec, ctdb->pnn);
3190                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3191                 }
3192 #endif
3193         }
3194
3195
3196         goto again;
3197
3198 }
3199
3200 /*
3201   event handler for when the main ctdbd dies
3202  */
3203 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3204                                  uint16_t flags, void *private_data)
3205 {
3206         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3207         _exit(1);
3208 }
3209
3210 /*
3211   called regularly to verify that the recovery daemon is still running
3212  */
3213 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3214                               struct timeval yt, void *p)
3215 {
3216         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3217
3218         if (kill(ctdb->recoverd_pid, 0) != 0) {
3219                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3220
3221                 ctdb_stop_recoverd(ctdb);
3222                 ctdb_stop_keepalive(ctdb);
3223                 ctdb_stop_monitoring(ctdb);
3224                 ctdb_release_all_ips(ctdb);
3225                 if (ctdb->methods != NULL) {
3226                         ctdb->methods->shutdown(ctdb);
3227                 }
3228                 ctdb_event_script(ctdb, "shutdown");
3229
3230                 exit(10);       
3231         }
3232
3233         event_add_timed(ctdb->ev, ctdb, 
3234                         timeval_current_ofs(30, 0),
3235                         ctdb_check_recd, ctdb);
3236 }
3237
3238 static void recd_sig_child_handler(struct event_context *ev,
3239         struct signal_event *se, int signum, int count,
3240         void *dont_care, 
3241         void *private_data)
3242 {
3243 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3244         int status;
3245         pid_t pid = -1;
3246
3247         while (pid != 0) {
3248                 pid = waitpid(-1, &status, WNOHANG);
3249                 if (pid == -1) {
3250                         if (errno != ECHILD) {
3251                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3252                         }
3253                         return;
3254                 }
3255                 if (pid > 0) {
3256                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3257                 }
3258         }
3259 }
3260
3261 /*
3262   startup the recovery daemon as a child of the main ctdb daemon
3263  */
3264 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3265 {
3266         int fd[2];
3267         struct signal_event *se;
3268
3269         if (pipe(fd) != 0) {
3270                 return -1;
3271         }
3272
3273         ctdb->ctdbd_pid = getpid();
3274
3275         ctdb->recoverd_pid = fork();
3276         if (ctdb->recoverd_pid == -1) {
3277                 return -1;
3278         }
3279         
3280         if (ctdb->recoverd_pid != 0) {
3281                 close(fd[0]);
3282                 event_add_timed(ctdb->ev, ctdb, 
3283                                 timeval_current_ofs(30, 0),
3284                                 ctdb_check_recd, ctdb);
3285                 return 0;
3286         }
3287
3288         close(fd[1]);
3289
3290         srandom(getpid() ^ time(NULL));
3291
3292         if (switch_from_server_to_client(ctdb) != 0) {
3293                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3294                 exit(1);
3295         }
3296
3297         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3298                      ctdb_recoverd_parent, &fd[0]);     
3299
3300         /* set up a handler to pick up sigchld */
3301         se = event_add_signal(ctdb->ev, ctdb,
3302                                      SIGCHLD, 0,
3303                                      recd_sig_child_handler,
3304                                      ctdb);
3305         if (se == NULL) {
3306                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3307                 exit(1);
3308         }
3309
3310         monitor_cluster(ctdb);
3311
3312         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3313         return -1;
3314 }
3315
3316 /*
3317   shutdown the recovery daemon
3318  */
3319 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3320 {
3321         if (ctdb->recoverd_pid == 0) {
3322                 return;
3323         }
3324
3325         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3326         kill(ctdb->recoverd_pid, SIGTERM);
3327 }