server: monitor interfaces in verify_ip_allocation()
[rusty/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/events/events.h"
22 #include "system/filesys.h"
23 #include "system/time.h"
24 #include "system/network.h"
25 #include "system/wait.h"
26 #include "popt.h"
27 #include "cmdline.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include "db_wrap.h"
31 #include "dlinklist.h"
32
33
34 /* list of "ctdb ipreallocate" processes to call back when we have
35    finished the takeover run.
36 */
37 struct ip_reallocate_list {
38         struct ip_reallocate_list *next;
39         struct rd_memdump_reply *rd;
40 };
41
42 struct ctdb_banning_state {
43         uint32_t count;
44         struct timeval last_reported_time;
45 };
46
47 /*
48   private state of recovery daemon
49  */
50 struct ctdb_recoverd {
51         struct ctdb_context *ctdb;
52         uint32_t recmaster;
53         uint32_t num_active;
54         uint32_t num_connected;
55         uint32_t last_culprit_node;
56         struct ctdb_node_map *nodemap;
57         struct timeval priority_time;
58         bool need_takeover_run;
59         bool need_recovery;
60         uint32_t node_flags;
61         struct timed_event *send_election_te;
62         struct timed_event *election_timeout;
63         struct vacuum_info *vacuum_info;
64         TALLOC_CTX *ip_reallocate_ctx;
65         struct ip_reallocate_list *reallocate_callers;
66         TALLOC_CTX *ip_check_disable_ctx;
67         struct ctdb_control_get_ifaces *ifaces;
68 };
69
70 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
71 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
72
73
74 /*
75   ban a node for a period of time
76  */
77 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
78 {
79         int ret;
80         struct ctdb_context *ctdb = rec->ctdb;
81         struct ctdb_ban_time bantime;
82        
83         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
84
85         if (!ctdb_validate_pnn(ctdb, pnn)) {
86                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
87                 return;
88         }
89
90         bantime.pnn  = pnn;
91         bantime.time = ban_time;
92
93         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
94         if (ret != 0) {
95                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
96                 return;
97         }
98
99 }
100
101 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
102
103
104 /*
105   run the "recovered" eventscript on all nodes
106  */
107 static int run_recovered_eventscript(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, const char *caller)
108 {
109         TALLOC_CTX *tmp_ctx;
110         uint32_t *nodes;
111
112         tmp_ctx = talloc_new(ctdb);
113         CTDB_NO_MEMORY(ctdb, tmp_ctx);
114
115         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
116         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
117                                         nodes, 0,
118                                         CONTROL_TIMEOUT(), false, tdb_null,
119                                         NULL, NULL,
120                                         NULL) != 0) {
121                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
122
123                 talloc_free(tmp_ctx);
124                 return -1;
125         }
126
127         talloc_free(tmp_ctx);
128         return 0;
129 }
130
131 /*
132   remember the trouble maker
133  */
134 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
135 {
136         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
137         struct ctdb_banning_state *ban_state;
138
139         if (culprit > ctdb->num_nodes) {
140                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
141                 return;
142         }
143
144         if (ctdb->nodes[culprit]->ban_state == NULL) {
145                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
146                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
147
148                 
149         }
150         ban_state = ctdb->nodes[culprit]->ban_state;
151         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
152                 /* this was the first time in a long while this node
153                    misbehaved so we will forgive any old transgressions.
154                 */
155                 ban_state->count = 0;
156         }
157
158         ban_state->count += count;
159         ban_state->last_reported_time = timeval_current();
160         rec->last_culprit_node = culprit;
161 }
162
163 /*
164   remember the trouble maker
165  */
166 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
167 {
168         ctdb_set_culprit_count(rec, culprit, 1);
169 }
170
171
172 /* this callback is called for every node that failed to execute the
173    start recovery event
174 */
175 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
176 {
177         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
178
179         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
180
181         ctdb_set_culprit(rec, node_pnn);
182 }
183
184 /*
185   run the "startrecovery" eventscript on all nodes
186  */
187 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
188 {
189         TALLOC_CTX *tmp_ctx;
190         uint32_t *nodes;
191         struct ctdb_context *ctdb = rec->ctdb;
192
193         tmp_ctx = talloc_new(ctdb);
194         CTDB_NO_MEMORY(ctdb, tmp_ctx);
195
196         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
198                                         nodes, 0,
199                                         CONTROL_TIMEOUT(), false, tdb_null,
200                                         NULL,
201                                         startrecovery_fail_callback,
202                                         rec) != 0) {
203                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
204                 talloc_free(tmp_ctx);
205                 return -1;
206         }
207
208         talloc_free(tmp_ctx);
209         return 0;
210 }
211
212 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
213 {
214         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
215                 DEBUG(DEBUG_ERR, (__location__ " Invalid lenght/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
216                 return;
217         }
218         if (node_pnn < ctdb->num_nodes) {
219                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
220         }
221 }
222
223 /*
224   update the node capabilities for all connected nodes
225  */
226 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
227 {
228         uint32_t *nodes;
229         TALLOC_CTX *tmp_ctx;
230
231         tmp_ctx = talloc_new(ctdb);
232         CTDB_NO_MEMORY(ctdb, tmp_ctx);
233
234         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
235         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
236                                         nodes, 0,
237                                         CONTROL_TIMEOUT(),
238                                         false, tdb_null,
239                                         async_getcap_callback, NULL,
240                                         NULL) != 0) {
241                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
242                 talloc_free(tmp_ctx);
243                 return -1;
244         }
245
246         talloc_free(tmp_ctx);
247         return 0;
248 }
249
250 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
251 {
252         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
253
254         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
255         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
256 }
257
258 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
259 {
260         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
261
262         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
263         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
264 }
265
266 /*
267   change recovery mode on all nodes
268  */
269 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
270 {
271         TDB_DATA data;
272         uint32_t *nodes;
273         TALLOC_CTX *tmp_ctx;
274
275         tmp_ctx = talloc_new(ctdb);
276         CTDB_NO_MEMORY(ctdb, tmp_ctx);
277
278         /* freeze all nodes */
279         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
280         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
281                 int i;
282
283                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
284                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
285                                                 nodes, i,
286                                                 CONTROL_TIMEOUT(),
287                                                 false, tdb_null,
288                                                 NULL,
289                                                 set_recmode_fail_callback,
290                                                 rec) != 0) {
291                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
292                                 talloc_free(tmp_ctx);
293                                 return -1;
294                         }
295                 }
296         }
297
298
299         data.dsize = sizeof(uint32_t);
300         data.dptr = (unsigned char *)&rec_mode;
301
302         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
303                                         nodes, 0,
304                                         CONTROL_TIMEOUT(),
305                                         false, data,
306                                         NULL, NULL,
307                                         NULL) != 0) {
308                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
309                 talloc_free(tmp_ctx);
310                 return -1;
311         }
312
313         talloc_free(tmp_ctx);
314         return 0;
315 }
316
317 /*
318   change recovery master on all node
319  */
320 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
321 {
322         TDB_DATA data;
323         TALLOC_CTX *tmp_ctx;
324         uint32_t *nodes;
325
326         tmp_ctx = talloc_new(ctdb);
327         CTDB_NO_MEMORY(ctdb, tmp_ctx);
328
329         data.dsize = sizeof(uint32_t);
330         data.dptr = (unsigned char *)&pnn;
331
332         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
333         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
334                                         nodes, 0,
335                                         CONTROL_TIMEOUT(), false, data,
336                                         NULL, NULL,
337                                         NULL) != 0) {
338                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
339                 talloc_free(tmp_ctx);
340                 return -1;
341         }
342
343         talloc_free(tmp_ctx);
344         return 0;
345 }
346
347 /* update all remote nodes to use the same db priority that we have
348    this can fail if the remove node has not yet been upgraded to 
349    support this function, so we always return success and never fail
350    a recovery if this call fails.
351 */
352 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
353         struct ctdb_node_map *nodemap, 
354         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
355 {
356         int db;
357         uint32_t *nodes;
358
359         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
360
361         /* step through all local databases */
362         for (db=0; db<dbmap->num;db++) {
363                 TDB_DATA data;
364                 struct ctdb_db_priority db_prio;
365                 int ret;
366
367                 db_prio.db_id     = dbmap->dbs[db].dbid;
368                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
369                 if (ret != 0) {
370                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
371                         continue;
372                 }
373
374                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
375
376                 data.dptr  = (uint8_t *)&db_prio;
377                 data.dsize = sizeof(db_prio);
378
379                 if (ctdb_client_async_control(ctdb,
380                                         CTDB_CONTROL_SET_DB_PRIORITY,
381                                         nodes, 0,
382                                         CONTROL_TIMEOUT(), false, data,
383                                         NULL, NULL,
384                                         NULL) != 0) {
385                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
386                 }
387         }
388
389         return 0;
390 }                       
391
392 /*
393   ensure all other nodes have attached to any databases that we have
394  */
395 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
396                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
397 {
398         int i, j, db, ret;
399         struct ctdb_dbid_map *remote_dbmap;
400
401         /* verify that all other nodes have all our databases */
402         for (j=0; j<nodemap->num; j++) {
403                 /* we dont need to ourself ourselves */
404                 if (nodemap->nodes[j].pnn == pnn) {
405                         continue;
406                 }
407                 /* dont check nodes that are unavailable */
408                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
409                         continue;
410                 }
411
412                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
413                                          mem_ctx, &remote_dbmap);
414                 if (ret != 0) {
415                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
416                         return -1;
417                 }
418
419                 /* step through all local databases */
420                 for (db=0; db<dbmap->num;db++) {
421                         const char *name;
422
423
424                         for (i=0;i<remote_dbmap->num;i++) {
425                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
426                                         break;
427                                 }
428                         }
429                         /* the remote node already have this database */
430                         if (i!=remote_dbmap->num) {
431                                 continue;
432                         }
433                         /* ok so we need to create this database */
434                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn, dbmap->dbs[db].dbid, 
435                                             mem_ctx, &name);
436                         if (ret != 0) {
437                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
438                                 return -1;
439                         }
440                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
441                                            mem_ctx, name, dbmap->dbs[db].persistent);
442                         if (ret != 0) {
443                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
444                                 return -1;
445                         }
446                 }
447         }
448
449         return 0;
450 }
451
452
453 /*
454   ensure we are attached to any databases that anyone else is attached to
455  */
456 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
457                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
458 {
459         int i, j, db, ret;
460         struct ctdb_dbid_map *remote_dbmap;
461
462         /* verify that we have all database any other node has */
463         for (j=0; j<nodemap->num; j++) {
464                 /* we dont need to ourself ourselves */
465                 if (nodemap->nodes[j].pnn == pnn) {
466                         continue;
467                 }
468                 /* dont check nodes that are unavailable */
469                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
470                         continue;
471                 }
472
473                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
474                                          mem_ctx, &remote_dbmap);
475                 if (ret != 0) {
476                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
477                         return -1;
478                 }
479
480                 /* step through all databases on the remote node */
481                 for (db=0; db<remote_dbmap->num;db++) {
482                         const char *name;
483
484                         for (i=0;i<(*dbmap)->num;i++) {
485                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
486                                         break;
487                                 }
488                         }
489                         /* we already have this db locally */
490                         if (i!=(*dbmap)->num) {
491                                 continue;
492                         }
493                         /* ok so we need to create this database and
494                            rebuild dbmap
495                          */
496                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
497                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
498                         if (ret != 0) {
499                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
500                                           nodemap->nodes[j].pnn));
501                                 return -1;
502                         }
503                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
504                                            remote_dbmap->dbs[db].persistent);
505                         if (ret != 0) {
506                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
507                                 return -1;
508                         }
509                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
510                         if (ret != 0) {
511                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
512                                 return -1;
513                         }
514                 }
515         }
516
517         return 0;
518 }
519
520
521 /*
522   pull the remote database contents from one node into the recdb
523  */
524 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
525                                     struct tdb_wrap *recdb, uint32_t dbid,
526                                     bool persistent)
527 {
528         int ret;
529         TDB_DATA outdata;
530         struct ctdb_marshall_buffer *reply;
531         struct ctdb_rec_data *rec;
532         int i;
533         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
534
535         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
536                                CONTROL_TIMEOUT(), &outdata);
537         if (ret != 0) {
538                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
539                 talloc_free(tmp_ctx);
540                 return -1;
541         }
542
543         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
544
545         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
546                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
547                 talloc_free(tmp_ctx);
548                 return -1;
549         }
550         
551         rec = (struct ctdb_rec_data *)&reply->data[0];
552         
553         for (i=0;
554              i<reply->count;
555              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
556                 TDB_DATA key, data;
557                 struct ctdb_ltdb_header *hdr;
558                 TDB_DATA existing;
559                 
560                 key.dptr = &rec->data[0];
561                 key.dsize = rec->keylen;
562                 data.dptr = &rec->data[key.dsize];
563                 data.dsize = rec->datalen;
564                 
565                 hdr = (struct ctdb_ltdb_header *)data.dptr;
566
567                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
568                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
569                         talloc_free(tmp_ctx);
570                         return -1;
571                 }
572
573                 /* fetch the existing record, if any */
574                 existing = tdb_fetch(recdb->tdb, key);
575                 
576                 if (existing.dptr != NULL) {
577                         struct ctdb_ltdb_header header;
578                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
579                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
580                                          (unsigned)existing.dsize, srcnode));
581                                 free(existing.dptr);
582                                 talloc_free(tmp_ctx);
583                                 return -1;
584                         }
585                         header = *(struct ctdb_ltdb_header *)existing.dptr;
586                         free(existing.dptr);
587                         if (!(header.rsn < hdr->rsn ||
588                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
589                                 continue;
590                         }
591                 }
592                 
593                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
594                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
595                         talloc_free(tmp_ctx);
596                         return -1;                              
597                 }
598         }
599
600         talloc_free(tmp_ctx);
601
602         return 0;
603 }
604
605 /*
606   pull all the remote database contents into the recdb
607  */
608 static int pull_remote_database(struct ctdb_context *ctdb,
609                                 struct ctdb_recoverd *rec, 
610                                 struct ctdb_node_map *nodemap, 
611                                 struct tdb_wrap *recdb, uint32_t dbid,
612                                 bool persistent)
613 {
614         int j;
615
616         /* pull all records from all other nodes across onto this node
617            (this merges based on rsn)
618         */
619         for (j=0; j<nodemap->num; j++) {
620                 /* dont merge from nodes that are unavailable */
621                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
622                         continue;
623                 }
624                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid, persistent) != 0) {
625                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
626                                  nodemap->nodes[j].pnn));
627                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
628                         return -1;
629                 }
630         }
631         
632         return 0;
633 }
634
635
636 /*
637   update flags on all active nodes
638  */
639 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
640 {
641         int ret;
642
643         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
644                 if (ret != 0) {
645                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
646                 return -1;
647         }
648
649         return 0;
650 }
651
652 /*
653   ensure all nodes have the same vnnmap we do
654  */
655 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
656                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
657 {
658         int j, ret;
659
660         /* push the new vnn map out to all the nodes */
661         for (j=0; j<nodemap->num; j++) {
662                 /* dont push to nodes that are unavailable */
663                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
664                         continue;
665                 }
666
667                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
668                 if (ret != 0) {
669                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
670                         return -1;
671                 }
672         }
673
674         return 0;
675 }
676
677
678 struct vacuum_info {
679         struct vacuum_info *next, *prev;
680         struct ctdb_recoverd *rec;
681         uint32_t srcnode;
682         struct ctdb_db_context *ctdb_db;
683         struct ctdb_marshall_buffer *recs;
684         struct ctdb_rec_data *r;
685 };
686
687 static void vacuum_fetch_next(struct vacuum_info *v);
688
689 /*
690   called when a vacuum fetch has completed - just free it and do the next one
691  */
692 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
693 {
694         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
695         talloc_free(state);
696         vacuum_fetch_next(v);
697 }
698
699
700 /*
701   process the next element from the vacuum list
702 */
703 static void vacuum_fetch_next(struct vacuum_info *v)
704 {
705         struct ctdb_call call;
706         struct ctdb_rec_data *r;
707
708         while (v->recs->count) {
709                 struct ctdb_client_call_state *state;
710                 TDB_DATA data;
711                 struct ctdb_ltdb_header *hdr;
712
713                 ZERO_STRUCT(call);
714                 call.call_id = CTDB_NULL_FUNC;
715                 call.flags = CTDB_IMMEDIATE_MIGRATION;
716
717                 r = v->r;
718                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
719                 v->recs->count--;
720
721                 call.key.dptr = &r->data[0];
722                 call.key.dsize = r->keylen;
723
724                 /* ensure we don't block this daemon - just skip a record if we can't get
725                    the chainlock */
726                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
727                         continue;
728                 }
729
730                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
731                 if (data.dptr == NULL) {
732                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
733                         continue;
734                 }
735
736                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
737                         free(data.dptr);
738                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
739                         continue;
740                 }
741                 
742                 hdr = (struct ctdb_ltdb_header *)data.dptr;
743                 if (hdr->dmaster == v->rec->ctdb->pnn) {
744                         /* its already local */
745                         free(data.dptr);
746                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
747                         continue;
748                 }
749
750                 free(data.dptr);
751
752                 state = ctdb_call_send(v->ctdb_db, &call);
753                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
754                 if (state == NULL) {
755                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
756                         talloc_free(v);
757                         return;
758                 }
759                 state->async.fn = vacuum_fetch_callback;
760                 state->async.private_data = v;
761                 return;
762         }
763
764         talloc_free(v);
765 }
766
767
768 /*
769   destroy a vacuum info structure
770  */
771 static int vacuum_info_destructor(struct vacuum_info *v)
772 {
773         DLIST_REMOVE(v->rec->vacuum_info, v);
774         return 0;
775 }
776
777
778 /*
779   handler for vacuum fetch
780 */
781 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
782                                  TDB_DATA data, void *private_data)
783 {
784         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
785         struct ctdb_marshall_buffer *recs;
786         int ret, i;
787         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
788         const char *name;
789         struct ctdb_dbid_map *dbmap=NULL;
790         bool persistent = false;
791         struct ctdb_db_context *ctdb_db;
792         struct ctdb_rec_data *r;
793         uint32_t srcnode;
794         struct vacuum_info *v;
795
796         recs = (struct ctdb_marshall_buffer *)data.dptr;
797         r = (struct ctdb_rec_data *)&recs->data[0];
798
799         if (recs->count == 0) {
800                 talloc_free(tmp_ctx);
801                 return;
802         }
803
804         srcnode = r->reqid;
805
806         for (v=rec->vacuum_info;v;v=v->next) {
807                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
808                         /* we're already working on records from this node */
809                         talloc_free(tmp_ctx);
810                         return;
811                 }
812         }
813
814         /* work out if the database is persistent */
815         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
816         if (ret != 0) {
817                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
818                 talloc_free(tmp_ctx);
819                 return;
820         }
821
822         for (i=0;i<dbmap->num;i++) {
823                 if (dbmap->dbs[i].dbid == recs->db_id) {
824                         persistent = dbmap->dbs[i].persistent;
825                         break;
826                 }
827         }
828         if (i == dbmap->num) {
829                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
830                 talloc_free(tmp_ctx);
831                 return;         
832         }
833
834         /* find the name of this database */
835         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
836                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
837                 talloc_free(tmp_ctx);
838                 return;
839         }
840
841         /* attach to it */
842         ctdb_db = ctdb_attach(ctdb, name, persistent, 0);
843         if (ctdb_db == NULL) {
844                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
845                 talloc_free(tmp_ctx);
846                 return;
847         }
848
849         v = talloc_zero(rec, struct vacuum_info);
850         if (v == NULL) {
851                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
852                 talloc_free(tmp_ctx);
853                 return;
854         }
855
856         v->rec = rec;
857         v->srcnode = srcnode;
858         v->ctdb_db = ctdb_db;
859         v->recs = talloc_memdup(v, recs, data.dsize);
860         if (v->recs == NULL) {
861                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
862                 talloc_free(v);
863                 talloc_free(tmp_ctx);
864                 return;         
865         }
866         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
867
868         DLIST_ADD(rec->vacuum_info, v);
869
870         talloc_set_destructor(v, vacuum_info_destructor);
871
872         vacuum_fetch_next(v);
873         talloc_free(tmp_ctx);
874 }
875
876
877 /*
878   called when ctdb_wait_timeout should finish
879  */
880 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
881                               struct timeval yt, void *p)
882 {
883         uint32_t *timed_out = (uint32_t *)p;
884         (*timed_out) = 1;
885 }
886
887 /*
888   wait for a given number of seconds
889  */
890 static void ctdb_wait_timeout(struct ctdb_context *ctdb, uint32_t secs)
891 {
892         uint32_t timed_out = 0;
893         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, 0), ctdb_wait_handler, &timed_out);
894         while (!timed_out) {
895                 event_loop_once(ctdb->ev);
896         }
897 }
898
899 /*
900   called when an election times out (ends)
901  */
902 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
903                                   struct timeval t, void *p)
904 {
905         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
906         rec->election_timeout = NULL;
907
908         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
909 }
910
911
912 /*
913   wait for an election to finish. It finished election_timeout seconds after
914   the last election packet is received
915  */
916 static void ctdb_wait_election(struct ctdb_recoverd *rec)
917 {
918         struct ctdb_context *ctdb = rec->ctdb;
919         while (rec->election_timeout) {
920                 event_loop_once(ctdb->ev);
921         }
922 }
923
924 /*
925   Update our local flags from all remote connected nodes. 
926   This is only run when we are or we belive we are the recovery master
927  */
928 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
929 {
930         int j;
931         struct ctdb_context *ctdb = rec->ctdb;
932         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
933
934         /* get the nodemap for all active remote nodes and verify
935            they are the same as for this node
936          */
937         for (j=0; j<nodemap->num; j++) {
938                 struct ctdb_node_map *remote_nodemap=NULL;
939                 int ret;
940
941                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
942                         continue;
943                 }
944                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
945                         continue;
946                 }
947
948                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
949                                            mem_ctx, &remote_nodemap);
950                 if (ret != 0) {
951                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
952                                   nodemap->nodes[j].pnn));
953                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
954                         talloc_free(mem_ctx);
955                         return MONITOR_FAILED;
956                 }
957                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
958                         /* We should tell our daemon about this so it
959                            updates its flags or else we will log the same 
960                            message again in the next iteration of recovery.
961                            Since we are the recovery master we can just as
962                            well update the flags on all nodes.
963                         */
964                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, nodemap->nodes[j].flags, ~nodemap->nodes[j].flags);
965                         if (ret != 0) {
966                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
967                                 return -1;
968                         }
969
970                         /* Update our local copy of the flags in the recovery
971                            daemon.
972                         */
973                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
974                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
975                                  nodemap->nodes[j].flags));
976                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
977                 }
978                 talloc_free(remote_nodemap);
979         }
980         talloc_free(mem_ctx);
981         return MONITOR_OK;
982 }
983
984
985 /* Create a new random generation ip. 
986    The generation id can not be the INVALID_GENERATION id
987 */
988 static uint32_t new_generation(void)
989 {
990         uint32_t generation;
991
992         while (1) {
993                 generation = random();
994
995                 if (generation != INVALID_GENERATION) {
996                         break;
997                 }
998         }
999
1000         return generation;
1001 }
1002
1003
1004 /*
1005   create a temporary working database
1006  */
1007 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1008 {
1009         char *name;
1010         struct tdb_wrap *recdb;
1011         unsigned tdb_flags;
1012
1013         /* open up the temporary recovery database */
1014         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1015                                ctdb->db_directory_state,
1016                                ctdb->pnn);
1017         if (name == NULL) {
1018                 return NULL;
1019         }
1020         unlink(name);
1021
1022         tdb_flags = TDB_NOLOCK;
1023         if (ctdb->valgrinding) {
1024                 tdb_flags |= TDB_NOMMAP;
1025         }
1026         tdb_flags |= TDB_DISALLOW_NESTING;
1027
1028         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1029                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1030         if (recdb == NULL) {
1031                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1032         }
1033
1034         talloc_free(name);
1035
1036         return recdb;
1037 }
1038
1039
1040 /* 
1041    a traverse function for pulling all relevent records from recdb
1042  */
1043 struct recdb_data {
1044         struct ctdb_context *ctdb;
1045         struct ctdb_marshall_buffer *recdata;
1046         uint32_t len;
1047         bool failed;
1048         bool persistent;
1049 };
1050
1051 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1052 {
1053         struct recdb_data *params = (struct recdb_data *)p;
1054         struct ctdb_rec_data *rec;
1055         struct ctdb_ltdb_header *hdr;
1056
1057         /* skip empty records */
1058         if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1059                 return 0;
1060         }
1061
1062         /* update the dmaster field to point to us */
1063         hdr = (struct ctdb_ltdb_header *)data.dptr;
1064         if (!params->persistent) {
1065                 hdr->dmaster = params->ctdb->pnn;
1066         }
1067
1068         /* add the record to the blob ready to send to the nodes */
1069         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1070         if (rec == NULL) {
1071                 params->failed = true;
1072                 return -1;
1073         }
1074         params->recdata = talloc_realloc_size(NULL, params->recdata, rec->length + params->len);
1075         if (params->recdata == NULL) {
1076                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u (%u records)\n", 
1077                          rec->length + params->len, params->recdata->count));
1078                 params->failed = true;
1079                 return -1;
1080         }
1081         params->recdata->count++;
1082         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1083         params->len += rec->length;
1084         talloc_free(rec);
1085
1086         return 0;
1087 }
1088
1089 /*
1090   push the recdb database out to all nodes
1091  */
1092 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1093                                bool persistent,
1094                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1095 {
1096         struct recdb_data params;
1097         struct ctdb_marshall_buffer *recdata;
1098         TDB_DATA outdata;
1099         TALLOC_CTX *tmp_ctx;
1100         uint32_t *nodes;
1101
1102         tmp_ctx = talloc_new(ctdb);
1103         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1104
1105         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1106         CTDB_NO_MEMORY(ctdb, recdata);
1107
1108         recdata->db_id = dbid;
1109
1110         params.ctdb = ctdb;
1111         params.recdata = recdata;
1112         params.len = offsetof(struct ctdb_marshall_buffer, data);
1113         params.failed = false;
1114         params.persistent = persistent;
1115
1116         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1117                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1118                 talloc_free(params.recdata);
1119                 talloc_free(tmp_ctx);
1120                 return -1;
1121         }
1122
1123         if (params.failed) {
1124                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1125                 talloc_free(params.recdata);
1126                 talloc_free(tmp_ctx);
1127                 return -1;              
1128         }
1129
1130         recdata = params.recdata;
1131
1132         outdata.dptr = (void *)recdata;
1133         outdata.dsize = params.len;
1134
1135         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1136         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1137                                         nodes, 0,
1138                                         CONTROL_TIMEOUT(), false, outdata,
1139                                         NULL, NULL,
1140                                         NULL) != 0) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1142                 talloc_free(recdata);
1143                 talloc_free(tmp_ctx);
1144                 return -1;
1145         }
1146
1147         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1148                   dbid, recdata->count));
1149
1150         talloc_free(recdata);
1151         talloc_free(tmp_ctx);
1152
1153         return 0;
1154 }
1155
1156
1157 /*
1158   go through a full recovery on one database 
1159  */
1160 static int recover_database(struct ctdb_recoverd *rec, 
1161                             TALLOC_CTX *mem_ctx,
1162                             uint32_t dbid,
1163                             bool persistent,
1164                             uint32_t pnn, 
1165                             struct ctdb_node_map *nodemap,
1166                             uint32_t transaction_id)
1167 {
1168         struct tdb_wrap *recdb;
1169         int ret;
1170         struct ctdb_context *ctdb = rec->ctdb;
1171         TDB_DATA data;
1172         struct ctdb_control_wipe_database w;
1173         uint32_t *nodes;
1174
1175         recdb = create_recdb(ctdb, mem_ctx);
1176         if (recdb == NULL) {
1177                 return -1;
1178         }
1179
1180         /* pull all remote databases onto the recdb */
1181         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1182         if (ret != 0) {
1183                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1184                 return -1;
1185         }
1186
1187         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1188
1189         /* wipe all the remote databases. This is safe as we are in a transaction */
1190         w.db_id = dbid;
1191         w.transaction_id = transaction_id;
1192
1193         data.dptr = (void *)&w;
1194         data.dsize = sizeof(w);
1195
1196         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1197         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1198                                         nodes, 0,
1199                                         CONTROL_TIMEOUT(), false, data,
1200                                         NULL, NULL,
1201                                         NULL) != 0) {
1202                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1203                 talloc_free(recdb);
1204                 return -1;
1205         }
1206         
1207         /* push out the correct database. This sets the dmaster and skips 
1208            the empty records */
1209         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1210         if (ret != 0) {
1211                 talloc_free(recdb);
1212                 return -1;
1213         }
1214
1215         /* all done with this database */
1216         talloc_free(recdb);
1217
1218         return 0;
1219 }
1220
1221 /*
1222   reload the nodes file 
1223 */
1224 static void reload_nodes_file(struct ctdb_context *ctdb)
1225 {
1226         ctdb->nodes = NULL;
1227         ctdb_load_nodes_file(ctdb);
1228 }
1229
1230         
1231 /*
1232   we are the recmaster, and recovery is needed - start a recovery run
1233  */
1234 static int do_recovery(struct ctdb_recoverd *rec, 
1235                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1236                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1237 {
1238         struct ctdb_context *ctdb = rec->ctdb;
1239         int i, j, ret;
1240         uint32_t generation;
1241         struct ctdb_dbid_map *dbmap;
1242         TDB_DATA data;
1243         uint32_t *nodes;
1244         struct timeval start_time;
1245
1246         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1247
1248         /* if recovery fails, force it again */
1249         rec->need_recovery = true;
1250
1251         for (i=0; i<ctdb->num_nodes; i++) {
1252                 struct ctdb_banning_state *ban_state;
1253
1254                 if (ctdb->nodes[i]->ban_state == NULL) {
1255                         continue;
1256                 }
1257                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1258                 if (ban_state->count < 2*ctdb->num_nodes) {
1259                         continue;
1260                 }
1261                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
1262                         ctdb->nodes[i]->pnn, ban_state->count,
1263                         ctdb->tunable.recovery_ban_period));
1264                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1265                 ban_state->count = 0;
1266         }
1267
1268
1269         if (ctdb->tunable.verify_recovery_lock != 0) {
1270                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1271                 start_time = timeval_current();
1272                 if (!ctdb_recovery_lock(ctdb, true)) {
1273                         ctdb_set_culprit(rec, pnn);
1274                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery\n"));
1275                         return -1;
1276                 }
1277                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1278                 DEBUG(DEBUG_ERR,("Recovery lock taken successfully by recovery daemon\n"));
1279         }
1280
1281         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1282
1283         /* get a list of all databases */
1284         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1285         if (ret != 0) {
1286                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1287                 return -1;
1288         }
1289
1290         /* we do the db creation before we set the recovery mode, so the freeze happens
1291            on all databases we will be dealing with. */
1292
1293         /* verify that we have all the databases any other node has */
1294         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1295         if (ret != 0) {
1296                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1297                 return -1;
1298         }
1299
1300         /* verify that all other nodes have all our databases */
1301         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1302         if (ret != 0) {
1303                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1304                 return -1;
1305         }
1306         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1307
1308         /* update the database priority for all remote databases */
1309         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1310         if (ret != 0) {
1311                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1312         }
1313         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1314
1315
1316         /* set recovery mode to active on all nodes */
1317         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1318         if (ret != 0) {
1319                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1320                 return -1;
1321         }
1322
1323         /* execute the "startrecovery" event script on all nodes */
1324         ret = run_startrecovery_eventscript(rec, nodemap);
1325         if (ret!=0) {
1326                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1327                 return -1;
1328         }
1329
1330         /*
1331           update all nodes to have the same flags that we have
1332          */
1333         for (i=0;i<nodemap->num;i++) {
1334                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1335                         continue;
1336                 }
1337
1338                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1339                 if (ret != 0) {
1340                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1341                         return -1;
1342                 }
1343         }
1344
1345         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1346
1347         /* pick a new generation number */
1348         generation = new_generation();
1349
1350         /* change the vnnmap on this node to use the new generation 
1351            number but not on any other nodes.
1352            this guarantees that if we abort the recovery prematurely
1353            for some reason (a node stops responding?)
1354            that we can just return immediately and we will reenter
1355            recovery shortly again.
1356            I.e. we deliberately leave the cluster with an inconsistent
1357            generation id to allow us to abort recovery at any stage and
1358            just restart it from scratch.
1359          */
1360         vnnmap->generation = generation;
1361         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1362         if (ret != 0) {
1363                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1364                 return -1;
1365         }
1366
1367         data.dptr = (void *)&generation;
1368         data.dsize = sizeof(uint32_t);
1369
1370         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1371         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1372                                         nodes, 0,
1373                                         CONTROL_TIMEOUT(), false, data,
1374                                         NULL,
1375                                         transaction_start_fail_callback,
1376                                         rec) != 0) {
1377                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1378                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1379                                         nodes, 0,
1380                                         CONTROL_TIMEOUT(), false, tdb_null,
1381                                         NULL,
1382                                         NULL,
1383                                         NULL) != 0) {
1384                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1385                 }
1386                 return -1;
1387         }
1388
1389         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1390
1391         for (i=0;i<dbmap->num;i++) {
1392                 ret = recover_database(rec, mem_ctx,
1393                                        dbmap->dbs[i].dbid,
1394                                        dbmap->dbs[i].persistent,
1395                                        pnn, nodemap, generation);
1396                 if (ret != 0) {
1397                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1398                         return -1;
1399                 }
1400         }
1401
1402         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1403
1404         /* commit all the changes */
1405         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1406                                         nodes, 0,
1407                                         CONTROL_TIMEOUT(), false, data,
1408                                         NULL, NULL,
1409                                         NULL) != 0) {
1410                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1411                 return -1;
1412         }
1413
1414         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1415         
1416
1417         /* update the capabilities for all nodes */
1418         ret = update_capabilities(ctdb, nodemap);
1419         if (ret!=0) {
1420                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1421                 return -1;
1422         }
1423
1424         /* build a new vnn map with all the currently active and
1425            unbanned nodes */
1426         generation = new_generation();
1427         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1428         CTDB_NO_MEMORY(ctdb, vnnmap);
1429         vnnmap->generation = generation;
1430         vnnmap->size = 0;
1431         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1432         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1433         for (i=j=0;i<nodemap->num;i++) {
1434                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1435                         continue;
1436                 }
1437                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1438                         /* this node can not be an lmaster */
1439                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1440                         continue;
1441                 }
1442
1443                 vnnmap->size++;
1444                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1445                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1446                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1447
1448         }
1449         if (vnnmap->size == 0) {
1450                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1451                 vnnmap->size++;
1452                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1453                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1454                 vnnmap->map[0] = pnn;
1455         }       
1456
1457         /* update to the new vnnmap on all nodes */
1458         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1459         if (ret != 0) {
1460                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1461                 return -1;
1462         }
1463
1464         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1465
1466         /* update recmaster to point to us for all nodes */
1467         ret = set_recovery_master(ctdb, nodemap, pnn);
1468         if (ret!=0) {
1469                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1470                 return -1;
1471         }
1472
1473         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1474
1475         /*
1476           update all nodes to have the same flags that we have
1477          */
1478         for (i=0;i<nodemap->num;i++) {
1479                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1480                         continue;
1481                 }
1482
1483                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1484                 if (ret != 0) {
1485                         DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1486                         return -1;
1487                 }
1488         }
1489
1490         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1491
1492         /* disable recovery mode */
1493         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1494         if (ret != 0) {
1495                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1496                 return -1;
1497         }
1498
1499         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
1500
1501         /*
1502           tell nodes to takeover their public IPs
1503          */
1504         rec->need_takeover_run = false;
1505         ret = ctdb_takeover_run(ctdb, nodemap);
1506         if (ret != 0) {
1507                 DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses\n"));
1508                 return -1;
1509         }
1510         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - takeip finished\n"));
1511
1512         /* execute the "recovered" event script on all nodes */
1513         ret = run_recovered_eventscript(ctdb, nodemap, "do_recovery");
1514         if (ret!=0) {
1515                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
1516                 return -1;
1517         }
1518
1519         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
1520
1521         /* send a message to all clients telling them that the cluster 
1522            has been reconfigured */
1523         ctdb_send_message(ctdb, CTDB_BROADCAST_CONNECTED, CTDB_SRVID_RECONFIGURE, tdb_null);
1524
1525         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
1526
1527         rec->need_recovery = false;
1528
1529         /* we managed to complete a full recovery, make sure to forgive
1530            any past sins by the nodes that could now participate in the
1531            recovery.
1532         */
1533         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
1534         for (i=0;i<nodemap->num;i++) {
1535                 struct ctdb_banning_state *ban_state;
1536
1537                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1538                         continue;
1539                 }
1540
1541                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
1542                 if (ban_state == NULL) {
1543                         continue;
1544                 }
1545
1546                 ban_state->count = 0;
1547         }
1548
1549
1550         /* We just finished a recovery successfully. 
1551            We now wait for rerecovery_timeout before we allow 
1552            another recovery to take place.
1553         */
1554         DEBUG(DEBUG_NOTICE, (__location__ " New recoveries supressed for the rerecovery timeout\n"));
1555         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
1556         DEBUG(DEBUG_NOTICE, (__location__ " Rerecovery timeout elapsed. Recovery reactivated.\n"));
1557
1558         return 0;
1559 }
1560
1561
1562 /*
1563   elections are won by first checking the number of connected nodes, then
1564   the priority time, then the pnn
1565  */
1566 struct election_message {
1567         uint32_t num_connected;
1568         struct timeval priority_time;
1569         uint32_t pnn;
1570         uint32_t node_flags;
1571 };
1572
1573 /*
1574   form this nodes election data
1575  */
1576 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
1577 {
1578         int ret, i;
1579         struct ctdb_node_map *nodemap;
1580         struct ctdb_context *ctdb = rec->ctdb;
1581
1582         ZERO_STRUCTP(em);
1583
1584         em->pnn = rec->ctdb->pnn;
1585         em->priority_time = rec->priority_time;
1586
1587         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
1588         if (ret != 0) {
1589                 DEBUG(DEBUG_ERR,(__location__ " unable to get election data\n"));
1590                 return;
1591         }
1592
1593         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
1594         em->node_flags = rec->node_flags;
1595
1596         for (i=0;i<nodemap->num;i++) {
1597                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
1598                         em->num_connected++;
1599                 }
1600         }
1601
1602         /* we shouldnt try to win this election if we cant be a recmaster */
1603         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1604                 em->num_connected = 0;
1605                 em->priority_time = timeval_current();
1606         }
1607
1608         talloc_free(nodemap);
1609 }
1610
1611 /*
1612   see if the given election data wins
1613  */
1614 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
1615 {
1616         struct election_message myem;
1617         int cmp = 0;
1618
1619         ctdb_election_data(rec, &myem);
1620
1621         /* we cant win if we dont have the recmaster capability */
1622         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
1623                 return false;
1624         }
1625
1626         /* we cant win if we are banned */
1627         if (rec->node_flags & NODE_FLAGS_BANNED) {
1628                 return false;
1629         }       
1630
1631         /* we cant win if we are stopped */
1632         if (rec->node_flags & NODE_FLAGS_STOPPED) {
1633                 return false;
1634         }       
1635
1636         /* we will automatically win if the other node is banned */
1637         if (em->node_flags & NODE_FLAGS_BANNED) {
1638                 return true;
1639         }
1640
1641         /* we will automatically win if the other node is banned */
1642         if (em->node_flags & NODE_FLAGS_STOPPED) {
1643                 return true;
1644         }
1645
1646         /* try to use the most connected node */
1647         if (cmp == 0) {
1648                 cmp = (int)myem.num_connected - (int)em->num_connected;
1649         }
1650
1651         /* then the longest running node */
1652         if (cmp == 0) {
1653                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
1654         }
1655
1656         if (cmp == 0) {
1657                 cmp = (int)myem.pnn - (int)em->pnn;
1658         }
1659
1660         return cmp > 0;
1661 }
1662
1663 /*
1664   send out an election request
1665  */
1666 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn, bool update_recmaster)
1667 {
1668         int ret;
1669         TDB_DATA election_data;
1670         struct election_message emsg;
1671         uint64_t srvid;
1672         struct ctdb_context *ctdb = rec->ctdb;
1673
1674         srvid = CTDB_SRVID_RECOVERY;
1675
1676         ctdb_election_data(rec, &emsg);
1677
1678         election_data.dsize = sizeof(struct election_message);
1679         election_data.dptr  = (unsigned char *)&emsg;
1680
1681
1682         /* send an election message to all active nodes */
1683         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
1684         ctdb_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
1685
1686
1687         /* A new node that is already frozen has entered the cluster.
1688            The existing nodes are not frozen and dont need to be frozen
1689            until the election has ended and we start the actual recovery
1690         */
1691         if (update_recmaster == true) {
1692                 /* first we assume we will win the election and set 
1693                    recoverymaster to be ourself on the current node
1694                  */
1695                 ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
1696                 if (ret != 0) {
1697                         DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
1698                         return -1;
1699                 }
1700         }
1701
1702
1703         return 0;
1704 }
1705
1706 /*
1707   this function will unban all nodes in the cluster
1708 */
1709 static void unban_all_nodes(struct ctdb_context *ctdb)
1710 {
1711         int ret, i;
1712         struct ctdb_node_map *nodemap;
1713         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1714         
1715         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
1718                 return;
1719         }
1720
1721         for (i=0;i<nodemap->num;i++) {
1722                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
1723                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
1724                         ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[i].pnn, 0, NODE_FLAGS_BANNED);
1725                 }
1726         }
1727
1728         talloc_free(tmp_ctx);
1729 }
1730
1731
1732 /*
1733   we think we are winning the election - send a broadcast election request
1734  */
1735 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
1736 {
1737         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1738         int ret;
1739
1740         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb), false);
1741         if (ret != 0) {
1742                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
1743         }
1744
1745         talloc_free(rec->send_election_te);
1746         rec->send_election_te = NULL;
1747 }
1748
1749 /*
1750   handler for memory dumps
1751 */
1752 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1753                              TDB_DATA data, void *private_data)
1754 {
1755         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1756         TDB_DATA *dump;
1757         int ret;
1758         struct rd_memdump_reply *rd;
1759
1760         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1761                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1762                 talloc_free(tmp_ctx);
1763                 return;
1764         }
1765         rd = (struct rd_memdump_reply *)data.dptr;
1766
1767         dump = talloc_zero(tmp_ctx, TDB_DATA);
1768         if (dump == NULL) {
1769                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
1770                 talloc_free(tmp_ctx);
1771                 return;
1772         }
1773         ret = ctdb_dump_memory(ctdb, dump);
1774         if (ret != 0) {
1775                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
1776                 talloc_free(tmp_ctx);
1777                 return;
1778         }
1779
1780 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
1781
1782         ret = ctdb_send_message(ctdb, rd->pnn, rd->srvid, *dump);
1783         if (ret != 0) {
1784                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
1785                 talloc_free(tmp_ctx);
1786                 return;
1787         }
1788
1789         talloc_free(tmp_ctx);
1790 }
1791
1792 /*
1793   handler for reload_nodes
1794 */
1795 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1796                              TDB_DATA data, void *private_data)
1797 {
1798         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1799
1800         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
1801
1802         reload_nodes_file(rec->ctdb);
1803 }
1804
1805
1806 static void reenable_ip_check(struct event_context *ev, struct timed_event *te, 
1807                               struct timeval yt, void *p)
1808 {
1809         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1810
1811         talloc_free(rec->ip_check_disable_ctx);
1812         rec->ip_check_disable_ctx = NULL;
1813 }
1814
1815 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1816                              TDB_DATA data, void *private_data)
1817 {
1818         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1819         uint32_t timeout;
1820
1821         if (rec->ip_check_disable_ctx != NULL) {
1822                 talloc_free(rec->ip_check_disable_ctx);
1823                 rec->ip_check_disable_ctx = NULL;
1824         }
1825
1826         if (data.dsize != sizeof(uint32_t)) {
1827                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
1828                                  "expexting %lu\n", (long unsigned)data.dsize,
1829                                  (long unsigned)sizeof(uint32_t)));
1830                 return;
1831         }
1832         if (data.dptr == NULL) {
1833                 DEBUG(DEBUG_ERR,(__location__ " No data recaived\n"));
1834                 return;
1835         }
1836
1837         timeout = *((uint32_t *)data.dptr);
1838         DEBUG(DEBUG_NOTICE,("Disabling ip check for %u seconds\n", timeout));
1839
1840         rec->ip_check_disable_ctx = talloc_new(rec);
1841         CTDB_NO_MEMORY_VOID(ctdb, rec->ip_check_disable_ctx);
1842
1843         event_add_timed(ctdb->ev, rec->ip_check_disable_ctx, timeval_current_ofs(timeout, 0), reenable_ip_check, rec);
1844 }
1845
1846
1847 /*
1848   handler for ip reallocate, just add it to the list of callers and 
1849   handle this later in the monitor_cluster loop so we do not recurse
1850   with other callers to takeover_run()
1851 */
1852 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1853                              TDB_DATA data, void *private_data)
1854 {
1855         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1856         struct ip_reallocate_list *caller;
1857
1858         if (data.dsize != sizeof(struct rd_memdump_reply)) {
1859                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
1860                 return;
1861         }
1862
1863         if (rec->ip_reallocate_ctx == NULL) {
1864                 rec->ip_reallocate_ctx = talloc_new(rec);
1865                 CTDB_NO_MEMORY_FATAL(ctdb, rec->ip_reallocate_ctx);
1866         }
1867
1868         caller = talloc(rec->ip_reallocate_ctx, struct ip_reallocate_list);
1869         CTDB_NO_MEMORY_FATAL(ctdb, caller);
1870
1871         caller->rd   = (struct rd_memdump_reply *)talloc_steal(caller, data.dptr);
1872         caller->next = rec->reallocate_callers;
1873         rec->reallocate_callers = caller;
1874
1875         return;
1876 }
1877
1878 static void process_ipreallocate_requests(struct ctdb_context *ctdb, struct ctdb_recoverd *rec)
1879 {
1880         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1881         TDB_DATA result;
1882         int32_t ret;
1883         struct ip_reallocate_list *callers;
1884
1885         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
1886         ret = ctdb_takeover_run(ctdb, rec->nodemap);
1887         result.dsize = sizeof(int32_t);
1888         result.dptr  = (uint8_t *)&ret;
1889
1890         for (callers=rec->reallocate_callers; callers; callers=callers->next) {
1891
1892                 /* Someone that sent srvid==0 does not want a reply */
1893                 if (callers->rd->srvid == 0) {
1894                         continue;
1895                 }
1896                 DEBUG(DEBUG_INFO,("Sending ip reallocate reply message to "
1897                                   "%u:%llu\n", (unsigned)callers->rd->pnn,
1898                                   (unsigned long long)callers->rd->srvid));
1899                 ret = ctdb_send_message(ctdb, callers->rd->pnn, callers->rd->srvid, result);
1900                 if (ret != 0) {
1901                         DEBUG(DEBUG_ERR,("Failed to send ip reallocate reply "
1902                                          "message to %u:%llu\n",
1903                                          (unsigned)callers->rd->pnn,
1904                                          (unsigned long long)callers->rd->srvid));
1905                 }
1906         }
1907
1908         talloc_free(tmp_ctx);
1909         talloc_free(rec->ip_reallocate_ctx);
1910         rec->ip_reallocate_ctx = NULL;
1911         rec->reallocate_callers = NULL;
1912         
1913 }
1914
1915
1916 /*
1917   handler for recovery master elections
1918 */
1919 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1920                              TDB_DATA data, void *private_data)
1921 {
1922         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1923         int ret;
1924         struct election_message *em = (struct election_message *)data.dptr;
1925         TALLOC_CTX *mem_ctx;
1926
1927         /* we got an election packet - update the timeout for the election */
1928         talloc_free(rec->election_timeout);
1929         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1930                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1931                                                 ctdb_election_timeout, rec);
1932
1933         mem_ctx = talloc_new(ctdb);
1934
1935         /* someone called an election. check their election data
1936            and if we disagree and we would rather be the elected node, 
1937            send a new election message to all other nodes
1938          */
1939         if (ctdb_election_win(rec, em)) {
1940                 if (!rec->send_election_te) {
1941                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
1942                                                                 timeval_current_ofs(0, 500000),
1943                                                                 election_send_request, rec);
1944                 }
1945                 talloc_free(mem_ctx);
1946                 /*unban_all_nodes(ctdb);*/
1947                 return;
1948         }
1949         
1950         /* we didn't win */
1951         talloc_free(rec->send_election_te);
1952         rec->send_election_te = NULL;
1953
1954         if (ctdb->tunable.verify_recovery_lock != 0) {
1955                 /* release the recmaster lock */
1956                 if (em->pnn != ctdb->pnn &&
1957                     ctdb->recovery_lock_fd != -1) {
1958                         close(ctdb->recovery_lock_fd);
1959                         ctdb->recovery_lock_fd = -1;
1960                         unban_all_nodes(ctdb);
1961                 }
1962         }
1963
1964         /* ok, let that guy become recmaster then */
1965         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
1966         if (ret != 0) {
1967                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
1968                 talloc_free(mem_ctx);
1969                 return;
1970         }
1971
1972         talloc_free(mem_ctx);
1973         return;
1974 }
1975
1976
1977 /*
1978   force the start of the election process
1979  */
1980 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
1981                            struct ctdb_node_map *nodemap)
1982 {
1983         int ret;
1984         struct ctdb_context *ctdb = rec->ctdb;
1985
1986         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
1987
1988         /* set all nodes to recovery mode to stop all internode traffic */
1989         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1990         if (ret != 0) {
1991                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1992                 return;
1993         }
1994
1995         talloc_free(rec->election_timeout);
1996         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
1997                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
1998                                                 ctdb_election_timeout, rec);
1999
2000         ret = send_election_request(rec, pnn, true);
2001         if (ret!=0) {
2002                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2003                 return;
2004         }
2005
2006         /* wait for a few seconds to collect all responses */
2007         ctdb_wait_election(rec);
2008 }
2009
2010
2011
2012 /*
2013   handler for when a node changes its flags
2014 */
2015 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2016                             TDB_DATA data, void *private_data)
2017 {
2018         int ret;
2019         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2020         struct ctdb_node_map *nodemap=NULL;
2021         TALLOC_CTX *tmp_ctx;
2022         uint32_t changed_flags;
2023         int i;
2024         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2025         int disabled_flag_changed;
2026
2027         if (data.dsize != sizeof(*c)) {
2028                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2029                 return;
2030         }
2031
2032         tmp_ctx = talloc_new(ctdb);
2033         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2034
2035         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2036         if (ret != 0) {
2037                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2038                 talloc_free(tmp_ctx);
2039                 return;         
2040         }
2041
2042
2043         for (i=0;i<nodemap->num;i++) {
2044                 if (nodemap->nodes[i].pnn == c->pnn) break;
2045         }
2046
2047         if (i == nodemap->num) {
2048                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2049                 talloc_free(tmp_ctx);
2050                 return;
2051         }
2052
2053         changed_flags = c->old_flags ^ c->new_flags;
2054
2055         if (nodemap->nodes[i].flags != c->new_flags) {
2056                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2057         }
2058
2059         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2060
2061         nodemap->nodes[i].flags = c->new_flags;
2062
2063         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2064                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2065
2066         if (ret == 0) {
2067                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2068                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2069         }
2070         
2071         if (ret == 0 &&
2072             ctdb->recovery_master == ctdb->pnn &&
2073             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2074                 /* Only do the takeover run if the perm disabled or unhealthy
2075                    flags changed since these will cause an ip failover but not
2076                    a recovery.
2077                    If the node became disconnected or banned this will also
2078                    lead to an ip address failover but that is handled 
2079                    during recovery
2080                 */
2081                 if (disabled_flag_changed) {
2082                         rec->need_takeover_run = true;
2083                 }
2084         }
2085
2086         talloc_free(tmp_ctx);
2087 }
2088
2089 /*
2090   handler for when we need to push out flag changes ot all other nodes
2091 */
2092 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2093                             TDB_DATA data, void *private_data)
2094 {
2095         int ret;
2096         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2097         struct ctdb_node_map *nodemap=NULL;
2098         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2099         uint32_t recmaster;
2100         uint32_t *nodes;
2101
2102         /* find the recovery master */
2103         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2104         if (ret != 0) {
2105                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2106                 talloc_free(tmp_ctx);
2107                 return;
2108         }
2109
2110         /* read the node flags from the recmaster */
2111         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2112         if (ret != 0) {
2113                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2114                 talloc_free(tmp_ctx);
2115                 return;
2116         }
2117         if (c->pnn >= nodemap->num) {
2118                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2119                 talloc_free(tmp_ctx);
2120                 return;
2121         }
2122
2123         /* send the flags update to all connected nodes */
2124         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2125
2126         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2127                                       nodes, 0, CONTROL_TIMEOUT(),
2128                                       false, data,
2129                                       NULL, NULL,
2130                                       NULL) != 0) {
2131                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2132
2133                 talloc_free(tmp_ctx);
2134                 return;
2135         }
2136
2137         talloc_free(tmp_ctx);
2138 }
2139
2140
2141 struct verify_recmode_normal_data {
2142         uint32_t count;
2143         enum monitor_result status;
2144 };
2145
2146 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2147 {
2148         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2149
2150
2151         /* one more node has responded with recmode data*/
2152         rmdata->count--;
2153
2154         /* if we failed to get the recmode, then return an error and let
2155            the main loop try again.
2156         */
2157         if (state->state != CTDB_CONTROL_DONE) {
2158                 if (rmdata->status == MONITOR_OK) {
2159                         rmdata->status = MONITOR_FAILED;
2160                 }
2161                 return;
2162         }
2163
2164         /* if we got a response, then the recmode will be stored in the
2165            status field
2166         */
2167         if (state->status != CTDB_RECOVERY_NORMAL) {
2168                 DEBUG(DEBUG_NOTICE, (__location__ " Node:%u was in recovery mode. Restart recovery process\n", state->c->hdr.destnode));
2169                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2170         }
2171
2172         return;
2173 }
2174
2175
2176 /* verify that all nodes are in normal recovery mode */
2177 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2178 {
2179         struct verify_recmode_normal_data *rmdata;
2180         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2181         struct ctdb_client_control_state *state;
2182         enum monitor_result status;
2183         int j;
2184         
2185         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2186         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2187         rmdata->count  = 0;
2188         rmdata->status = MONITOR_OK;
2189
2190         /* loop over all active nodes and send an async getrecmode call to 
2191            them*/
2192         for (j=0; j<nodemap->num; j++) {
2193                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2194                         continue;
2195                 }
2196                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2197                                         CONTROL_TIMEOUT(), 
2198                                         nodemap->nodes[j].pnn);
2199                 if (state == NULL) {
2200                         /* we failed to send the control, treat this as 
2201                            an error and try again next iteration
2202                         */                      
2203                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2204                         talloc_free(mem_ctx);
2205                         return MONITOR_FAILED;
2206                 }
2207
2208                 /* set up the callback functions */
2209                 state->async.fn = verify_recmode_normal_callback;
2210                 state->async.private_data = rmdata;
2211
2212                 /* one more control to wait for to complete */
2213                 rmdata->count++;
2214         }
2215
2216
2217         /* now wait for up to the maximum number of seconds allowed
2218            or until all nodes we expect a response from has replied
2219         */
2220         while (rmdata->count > 0) {
2221                 event_loop_once(ctdb->ev);
2222         }
2223
2224         status = rmdata->status;
2225         talloc_free(mem_ctx);
2226         return status;
2227 }
2228
2229
2230 struct verify_recmaster_data {
2231         struct ctdb_recoverd *rec;
2232         uint32_t count;
2233         uint32_t pnn;
2234         enum monitor_result status;
2235 };
2236
2237 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2238 {
2239         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2240
2241
2242         /* one more node has responded with recmaster data*/
2243         rmdata->count--;
2244
2245         /* if we failed to get the recmaster, then return an error and let
2246            the main loop try again.
2247         */
2248         if (state->state != CTDB_CONTROL_DONE) {
2249                 if (rmdata->status == MONITOR_OK) {
2250                         rmdata->status = MONITOR_FAILED;
2251                 }
2252                 return;
2253         }
2254
2255         /* if we got a response, then the recmaster will be stored in the
2256            status field
2257         */
2258         if (state->status != rmdata->pnn) {
2259                 DEBUG(DEBUG_ERR,("Node %d does not agree we are the recmaster. Need a new recmaster election\n", state->c->hdr.destnode));
2260                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2261                 rmdata->status = MONITOR_ELECTION_NEEDED;
2262         }
2263
2264         return;
2265 }
2266
2267
2268 /* verify that all nodes agree that we are the recmaster */
2269 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2270 {
2271         struct ctdb_context *ctdb = rec->ctdb;
2272         struct verify_recmaster_data *rmdata;
2273         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2274         struct ctdb_client_control_state *state;
2275         enum monitor_result status;
2276         int j;
2277         
2278         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
2279         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2280         rmdata->rec    = rec;
2281         rmdata->count  = 0;
2282         rmdata->pnn    = pnn;
2283         rmdata->status = MONITOR_OK;
2284
2285         /* loop over all active nodes and send an async getrecmaster call to 
2286            them*/
2287         for (j=0; j<nodemap->num; j++) {
2288                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2289                         continue;
2290                 }
2291                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
2292                                         CONTROL_TIMEOUT(),
2293                                         nodemap->nodes[j].pnn);
2294                 if (state == NULL) {
2295                         /* we failed to send the control, treat this as 
2296                            an error and try again next iteration
2297                         */                      
2298                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
2299                         talloc_free(mem_ctx);
2300                         return MONITOR_FAILED;
2301                 }
2302
2303                 /* set up the callback functions */
2304                 state->async.fn = verify_recmaster_callback;
2305                 state->async.private_data = rmdata;
2306
2307                 /* one more control to wait for to complete */
2308                 rmdata->count++;
2309         }
2310
2311
2312         /* now wait for up to the maximum number of seconds allowed
2313            or until all nodes we expect a response from has replied
2314         */
2315         while (rmdata->count > 0) {
2316                 event_loop_once(ctdb->ev);
2317         }
2318
2319         status = rmdata->status;
2320         talloc_free(mem_ctx);
2321         return status;
2322 }
2323
2324
2325 /* called to check that the allocation of public ip addresses is ok.
2326 */
2327 static int verify_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn)
2328 {
2329         TALLOC_CTX *mem_ctx = talloc_new(NULL);
2330         struct ctdb_control_get_ifaces *ifaces = NULL;
2331         struct ctdb_all_public_ips *ips = NULL;
2332         struct ctdb_uptime *uptime1 = NULL;
2333         struct ctdb_uptime *uptime2 = NULL;
2334         int ret, j;
2335         bool need_iface_check = false;
2336         bool need_takeover_run = false;
2337
2338         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2339                                 CTDB_CURRENT_NODE, &uptime1);
2340         if (ret != 0) {
2341                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2342                 talloc_free(mem_ctx);
2343                 return -1;
2344         }
2345
2346
2347         /* read the interfaces from the local node */
2348         ret = ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ifaces);
2349         if (ret != 0) {
2350                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", pnn));
2351                 talloc_free(mem_ctx);
2352                 return -1;
2353         }
2354
2355         if (!rec->ifaces) {
2356                 need_iface_check = true;
2357         } else if (rec->ifaces->num != ifaces->num) {
2358                 need_iface_check = true;
2359         } else if (memcmp(rec->ifaces, ifaces, talloc_get_size(ifaces)) != 0) {
2360                 need_iface_check = true;
2361         }
2362
2363         if (need_iface_check) {
2364                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
2365                                      "local node %u - force takeover run\n",
2366                                      pnn));
2367                 need_takeover_run = true;
2368         }
2369
2370         /* read the ip allocation from the local node */
2371         ret = ctdb_ctrl_get_public_ips(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, &ips);
2372         if (ret != 0) {
2373                 DEBUG(DEBUG_ERR, ("Unable to get public ips from local node %u\n", pnn));
2374                 talloc_free(mem_ctx);
2375                 return -1;
2376         }
2377
2378         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
2379                                 CTDB_CURRENT_NODE, &uptime2);
2380         if (ret != 0) {
2381                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
2382                 talloc_free(mem_ctx);
2383                 return -1;
2384         }
2385
2386         /* skip the check if the startrecovery time has changed */
2387         if (timeval_compare(&uptime1->last_recovery_started,
2388                             &uptime2->last_recovery_started) != 0) {
2389                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2390                 talloc_free(mem_ctx);
2391                 return 0;
2392         }
2393
2394         /* skip the check if the endrecovery time has changed */
2395         if (timeval_compare(&uptime1->last_recovery_finished,
2396                             &uptime2->last_recovery_finished) != 0) {
2397                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
2398                 talloc_free(mem_ctx);
2399                 return 0;
2400         }
2401
2402         /* skip the check if we have started but not finished recovery */
2403         if (timeval_compare(&uptime1->last_recovery_finished,
2404                             &uptime1->last_recovery_started) != 1) {
2405                 DEBUG(DEBUG_NOTICE, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
2406                 talloc_free(mem_ctx);
2407
2408                 return 0;
2409         }
2410
2411         talloc_free(rec->ifaces);
2412         rec->ifaces = talloc_steal(rec, ifaces);
2413
2414         /* verify that we have the ip addresses we should have
2415            and we dont have ones we shouldnt have.
2416            if we find an inconsistency we set recmode to
2417            active on the local node and wait for the recmaster
2418            to do a full blown recovery
2419         */
2420         for (j=0; j<ips->num; j++) {
2421                 if (ips->ips[j].pnn == pnn) {
2422                         if (!ctdb_sys_have_ip(&ips->ips[j].addr)) {
2423                                 DEBUG(DEBUG_CRIT,("Public address '%s' is missing and we should serve this ip\n",
2424                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2425                                 need_takeover_run = true;
2426                         }
2427                 } else {
2428                         if (ctdb_sys_have_ip(&ips->ips[j].addr)) {
2429                                 DEBUG(DEBUG_CRIT,("We are still serving a public address '%s' that we should not be serving.\n", 
2430                                         ctdb_addr_to_str(&ips->ips[j].addr)));
2431                                 need_takeover_run = true;
2432                         }
2433                 }
2434         }
2435
2436         if (need_takeover_run) {
2437                 struct takeover_run_reply rd;
2438                 TDB_DATA data;
2439
2440                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
2441
2442                 rd.pnn = ctdb->pnn;
2443                 rd.srvid = 0;
2444                 data.dptr = (uint8_t *)&rd;
2445                 data.dsize = sizeof(rd);
2446
2447                 ret = ctdb_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
2448                 if (ret != 0) {
2449                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
2450                 }
2451         }
2452         talloc_free(mem_ctx);
2453         return 0;
2454 }
2455
2456
2457 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
2458 {
2459         struct ctdb_node_map **remote_nodemaps = callback_data;
2460
2461         if (node_pnn >= ctdb->num_nodes) {
2462                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
2463                 return;
2464         }
2465
2466         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
2467
2468 }
2469
2470 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
2471         struct ctdb_node_map *nodemap,
2472         struct ctdb_node_map **remote_nodemaps)
2473 {
2474         uint32_t *nodes;
2475
2476         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
2477         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
2478                                         nodes, 0,
2479                                         CONTROL_TIMEOUT(), false, tdb_null,
2480                                         async_getnodemap_callback,
2481                                         NULL,
2482                                         remote_nodemaps) != 0) {
2483                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
2484
2485                 return -1;
2486         }
2487
2488         return 0;
2489 }
2490
2491 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
2492 struct ctdb_check_reclock_state {
2493         struct ctdb_context *ctdb;
2494         struct timeval start_time;
2495         int fd[2];
2496         pid_t child;
2497         struct timed_event *te;
2498         struct fd_event *fde;
2499         enum reclock_child_status status;
2500 };
2501
2502 /* when we free the reclock state we must kill any child process.
2503 */
2504 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
2505 {
2506         struct ctdb_context *ctdb = state->ctdb;
2507
2508         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
2509
2510         if (state->fd[0] != -1) {
2511                 close(state->fd[0]);
2512                 state->fd[0] = -1;
2513         }
2514         if (state->fd[1] != -1) {
2515                 close(state->fd[1]);
2516                 state->fd[1] = -1;
2517         }
2518         kill(state->child, SIGKILL);
2519         return 0;
2520 }
2521
2522 /*
2523   called if our check_reclock child times out. this would happen if
2524   i/o to the reclock file blocks.
2525  */
2526 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
2527                                          struct timeval t, void *private_data)
2528 {
2529         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
2530                                            struct ctdb_check_reclock_state);
2531
2532         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
2533         state->status = RECLOCK_TIMEOUT;
2534 }
2535
2536 /* this is called when the child process has completed checking the reclock
2537    file and has written data back to us through the pipe.
2538 */
2539 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
2540                              uint16_t flags, void *private_data)
2541 {
2542         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
2543                                              struct ctdb_check_reclock_state);
2544         char c = 0;
2545         int ret;
2546
2547         /* we got a response from our child process so we can abort the
2548            timeout.
2549         */
2550         talloc_free(state->te);
2551         state->te = NULL;
2552
2553         ret = read(state->fd[0], &c, 1);
2554         if (ret != 1 || c != RECLOCK_OK) {
2555                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
2556                 state->status = RECLOCK_FAILED;
2557
2558                 return;
2559         }
2560
2561         state->status = RECLOCK_OK;
2562         return;
2563 }
2564
2565 static int check_recovery_lock(struct ctdb_context *ctdb)
2566 {
2567         int ret;
2568         struct ctdb_check_reclock_state *state;
2569         pid_t parent = getpid();
2570
2571         if (ctdb->recovery_lock_fd == -1) {
2572                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
2573                 return -1;
2574         }
2575
2576         state = talloc(ctdb, struct ctdb_check_reclock_state);
2577         CTDB_NO_MEMORY(ctdb, state);
2578
2579         state->ctdb = ctdb;
2580         state->start_time = timeval_current();
2581         state->status = RECLOCK_CHECKING;
2582         state->fd[0] = -1;
2583         state->fd[1] = -1;
2584
2585         ret = pipe(state->fd);
2586         if (ret != 0) {
2587                 talloc_free(state);
2588                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
2589                 return -1;
2590         }
2591
2592         state->child = fork();
2593         if (state->child == (pid_t)-1) {
2594                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
2595                 close(state->fd[0]);
2596                 state->fd[0] = -1;
2597                 close(state->fd[1]);
2598                 state->fd[1] = -1;
2599                 talloc_free(state);
2600                 return -1;
2601         }
2602
2603         if (state->child == 0) {
2604                 char cc = RECLOCK_OK;
2605                 close(state->fd[0]);
2606                 state->fd[0] = -1;
2607
2608                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
2609                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
2610                         cc = RECLOCK_FAILED;
2611                 }
2612
2613                 write(state->fd[1], &cc, 1);
2614                 /* make sure we die when our parent dies */
2615                 while (kill(parent, 0) == 0 || errno != ESRCH) {
2616                         sleep(5);
2617                         write(state->fd[1], &cc, 1);
2618                 }
2619                 _exit(0);
2620         }
2621         close(state->fd[1]);
2622         state->fd[1] = -1;
2623         set_close_on_exec(state->fd[0]);
2624
2625         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
2626
2627         talloc_set_destructor(state, check_reclock_destructor);
2628
2629         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
2630                                     ctdb_check_reclock_timeout, state);
2631         if (state->te == NULL) {
2632                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
2633                 talloc_free(state);
2634                 return -1;
2635         }
2636
2637         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
2638                                 EVENT_FD_READ|EVENT_FD_AUTOCLOSE,
2639                                 reclock_child_handler,
2640                                 (void *)state);
2641
2642         if (state->fde == NULL) {
2643                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
2644                 talloc_free(state);
2645                 return -1;
2646         }
2647
2648         while (state->status == RECLOCK_CHECKING) {
2649                 event_loop_once(ctdb->ev);
2650         }
2651
2652         if (state->status == RECLOCK_FAILED) {
2653                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
2654                 close(ctdb->recovery_lock_fd);
2655                 ctdb->recovery_lock_fd = -1;
2656                 talloc_free(state);
2657                 return -1;
2658         }
2659
2660         talloc_free(state);
2661         return 0;
2662 }
2663
2664 static int update_recovery_lock_file(struct ctdb_context *ctdb)
2665 {
2666         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
2667         const char *reclockfile;
2668
2669         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
2670                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
2671                 talloc_free(tmp_ctx);
2672                 return -1;      
2673         }
2674
2675         if (reclockfile == NULL) {
2676                 if (ctdb->recovery_lock_file != NULL) {
2677                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
2678                         talloc_free(ctdb->recovery_lock_file);
2679                         ctdb->recovery_lock_file = NULL;
2680                         if (ctdb->recovery_lock_fd != -1) {
2681                                 close(ctdb->recovery_lock_fd);
2682                                 ctdb->recovery_lock_fd = -1;
2683                         }
2684                 }
2685                 ctdb->tunable.verify_recovery_lock = 0;
2686                 talloc_free(tmp_ctx);
2687                 return 0;
2688         }
2689
2690         if (ctdb->recovery_lock_file == NULL) {
2691                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2692                 if (ctdb->recovery_lock_fd != -1) {
2693                         close(ctdb->recovery_lock_fd);
2694                         ctdb->recovery_lock_fd = -1;
2695                 }
2696                 talloc_free(tmp_ctx);
2697                 return 0;
2698         }
2699
2700
2701         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
2702                 talloc_free(tmp_ctx);
2703                 return 0;
2704         }
2705
2706         talloc_free(ctdb->recovery_lock_file);
2707         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
2708         ctdb->tunable.verify_recovery_lock = 0;
2709         if (ctdb->recovery_lock_fd != -1) {
2710                 close(ctdb->recovery_lock_fd);
2711                 ctdb->recovery_lock_fd = -1;
2712         }
2713
2714         talloc_free(tmp_ctx);
2715         return 0;
2716 }
2717                 
2718 /*
2719   the main monitoring loop
2720  */
2721 static void monitor_cluster(struct ctdb_context *ctdb)
2722 {
2723         uint32_t pnn;
2724         TALLOC_CTX *mem_ctx=NULL;
2725         struct ctdb_node_map *nodemap=NULL;
2726         struct ctdb_node_map *recmaster_nodemap=NULL;
2727         struct ctdb_node_map **remote_nodemaps=NULL;
2728         struct ctdb_vnn_map *vnnmap=NULL;
2729         struct ctdb_vnn_map *remote_vnnmap=NULL;
2730         int32_t debug_level;
2731         int i, j, ret;
2732         struct ctdb_recoverd *rec;
2733
2734         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
2735
2736         rec = talloc_zero(ctdb, struct ctdb_recoverd);
2737         CTDB_NO_MEMORY_FATAL(ctdb, rec);
2738
2739         rec->ctdb = ctdb;
2740
2741         rec->priority_time = timeval_current();
2742
2743         /* register a message port for sending memory dumps */
2744         ctdb_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
2745
2746         /* register a message port for recovery elections */
2747         ctdb_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
2748
2749         /* when nodes are disabled/enabled */
2750         ctdb_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
2751
2752         /* when we are asked to puch out a flag change */
2753         ctdb_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
2754
2755         /* register a message port for vacuum fetch */
2756         ctdb_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
2757
2758         /* register a message port for reloadnodes  */
2759         ctdb_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
2760
2761         /* register a message port for performing a takeover run */
2762         ctdb_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
2763
2764         /* register a message port for disabling the ip check for a short while */
2765         ctdb_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
2766
2767 again:
2768         if (mem_ctx) {
2769                 talloc_free(mem_ctx);
2770                 mem_ctx = NULL;
2771         }
2772         mem_ctx = talloc_new(ctdb);
2773         if (!mem_ctx) {
2774                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temporary context\n"));
2775                 exit(-1);
2776         }
2777
2778         /* we only check for recovery once every second */
2779         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval);
2780
2781         /* verify that the main daemon is still running */
2782         if (kill(ctdb->ctdbd_pid, 0) != 0) {
2783                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
2784                 exit(-1);
2785         }
2786
2787         /* ping the local daemon to tell it we are alive */
2788         ctdb_ctrl_recd_ping(ctdb);
2789
2790         if (rec->election_timeout) {
2791                 /* an election is in progress */
2792                 goto again;
2793         }
2794
2795         /* read the debug level from the parent and update locally */
2796         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
2797         if (ret !=0) {
2798                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
2799                 goto again;
2800         }
2801         LogLevel = debug_level;
2802
2803
2804         /* We must check if we need to ban a node here but we want to do this
2805            as early as possible so we dont wait until we have pulled the node
2806            map from the local node. thats why we have the hardcoded value 20
2807         */
2808         for (i=0; i<ctdb->num_nodes; i++) {
2809                 struct ctdb_banning_state *ban_state;
2810
2811                 if (ctdb->nodes[i]->ban_state == NULL) {
2812                         continue;
2813                 }
2814                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
2815                 if (ban_state->count < 20) {
2816                         continue;
2817                 }
2818                 DEBUG(DEBUG_NOTICE,("Node %u has caused %u recoveries recently - banning it for %u seconds\n",
2819                         ctdb->nodes[i]->pnn, ban_state->count,
2820                         ctdb->tunable.recovery_ban_period));
2821                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
2822                 ban_state->count = 0;
2823         }
2824
2825         /* get relevant tunables */
2826         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
2827         if (ret != 0) {
2828                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
2829                 goto again;
2830         }
2831
2832         /* get the current recovery lock file from the server */
2833         if (update_recovery_lock_file(ctdb) != 0) {
2834                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
2835                 goto again;
2836         }
2837
2838         /* Make sure that if recovery lock verification becomes disabled when
2839            we close the file
2840         */
2841         if (ctdb->tunable.verify_recovery_lock == 0) {
2842                 if (ctdb->recovery_lock_fd != -1) {
2843                         close(ctdb->recovery_lock_fd);
2844                         ctdb->recovery_lock_fd = -1;
2845                 }
2846         }
2847
2848         pnn = ctdb_ctrl_getpnn(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
2849         if (pnn == (uint32_t)-1) {
2850                 DEBUG(DEBUG_ERR,("Failed to get local pnn - retrying\n"));
2851                 goto again;
2852         }
2853
2854         /* get the vnnmap */
2855         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
2856         if (ret != 0) {
2857                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
2858                 goto again;
2859         }
2860
2861
2862         /* get number of nodes */
2863         if (rec->nodemap) {
2864                 talloc_free(rec->nodemap);
2865                 rec->nodemap = NULL;
2866                 nodemap=NULL;
2867         }
2868         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
2869         if (ret != 0) {
2870                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
2871                 goto again;
2872         }
2873         nodemap = rec->nodemap;
2874
2875         /* check which node is the recovery master */
2876         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
2877         if (ret != 0) {
2878                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
2879                 goto again;
2880         }
2881
2882         /* if we are not the recmaster we can safely ignore any ip reallocate requests */
2883         if (rec->recmaster != pnn) {
2884                 if (rec->ip_reallocate_ctx != NULL) {
2885                         talloc_free(rec->ip_reallocate_ctx);
2886                         rec->ip_reallocate_ctx = NULL;
2887                         rec->reallocate_callers = NULL;
2888                 }
2889         }
2890         /* if there are takeovers requested, perform it and notify the waiters */
2891         if (rec->reallocate_callers) {
2892                 process_ipreallocate_requests(ctdb, rec);
2893         }
2894
2895         if (rec->recmaster == (uint32_t)-1) {
2896                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
2897                 force_election(rec, pnn, nodemap);
2898                 goto again;
2899         }
2900
2901
2902         /* if the local daemon is STOPPED, we verify that the databases are
2903            also frozen and thet the recmode is set to active 
2904         */
2905         if (nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) {
2906                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2907                 if (ret != 0) {
2908                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
2909                 }
2910                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2911                         DEBUG(DEBUG_ERR,("Node is stopped but recovery mode is not active. Activate recovery mode and lock databases\n"));
2912
2913                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
2914                         if (ret != 0) {
2915                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node due to node being STOPPED\n"));
2916                                 goto again;
2917                         }
2918                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
2919                         if (ret != 0) {
2920                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode due to node being stopped\n"));
2921
2922                                 goto again;
2923                         }
2924                         goto again;
2925                 }
2926         }
2927         /* If the local node is stopped, verify we are not the recmaster 
2928            and yield this role if so
2929         */
2930         if ((nodemap->nodes[pnn].flags & NODE_FLAGS_STOPPED) && (rec->recmaster == pnn)) {
2931                 DEBUG(DEBUG_ERR,("Local node is STOPPED. Yielding recmaster role\n"));
2932                 force_election(rec, pnn, nodemap);
2933                 goto again;
2934         }
2935         
2936         /* check that we (recovery daemon) and the local ctdb daemon
2937            agrees on whether we are banned or not
2938         */
2939 //qqq
2940
2941         /* remember our own node flags */
2942         rec->node_flags = nodemap->nodes[pnn].flags;
2943
2944         /* count how many active nodes there are */
2945         rec->num_active    = 0;
2946         rec->num_connected = 0;
2947         for (i=0; i<nodemap->num; i++) {
2948                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
2949                         rec->num_active++;
2950                 }
2951                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2952                         rec->num_connected++;
2953                 }
2954         }
2955
2956
2957         /* verify that the recmaster node is still active */
2958         for (j=0; j<nodemap->num; j++) {
2959                 if (nodemap->nodes[j].pnn==rec->recmaster) {
2960                         break;
2961                 }
2962         }
2963
2964         if (j == nodemap->num) {
2965                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
2966                 force_election(rec, pnn, nodemap);
2967                 goto again;
2968         }
2969
2970         /* if recovery master is disconnected we must elect a new recmaster */
2971         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
2972                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
2973                 force_election(rec, pnn, nodemap);
2974                 goto again;
2975         }
2976
2977         /* grap the nodemap from the recovery master to check if it is banned */
2978         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
2979                                    mem_ctx, &recmaster_nodemap);
2980         if (ret != 0) {
2981                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
2982                           nodemap->nodes[j].pnn));
2983                 goto again;
2984         }
2985
2986
2987         if (recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2988                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
2989                 force_election(rec, pnn, nodemap);
2990                 goto again;
2991         }
2992
2993
2994         /* verify that we have all ip addresses we should have and we dont
2995          * have addresses we shouldnt have.
2996          */ 
2997         if (ctdb->do_checkpublicip) {
2998                 if (rec->ip_check_disable_ctx == NULL) {
2999                         if (verify_ip_allocation(ctdb, rec, pnn) != 0) {
3000                                 DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3001                         }
3002                 }
3003         }
3004
3005
3006         /* if we are not the recmaster then we do not need to check
3007            if recovery is needed
3008          */
3009         if (pnn != rec->recmaster) {
3010                 goto again;
3011         }
3012
3013
3014         /* ensure our local copies of flags are right */
3015         ret = update_local_flags(rec, nodemap);
3016         if (ret == MONITOR_ELECTION_NEEDED) {
3017                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3018                 force_election(rec, pnn, nodemap);
3019                 goto again;
3020         }
3021         if (ret != MONITOR_OK) {
3022                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3023                 goto again;
3024         }
3025
3026         /* update the list of public ips that a node can handle for
3027            all connected nodes
3028         */
3029         if (ctdb->num_nodes != nodemap->num) {
3030                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3031                 reload_nodes_file(ctdb);
3032                 goto again;
3033         }
3034         for (j=0; j<nodemap->num; j++) {
3035                 /* release any existing data */
3036                 if (ctdb->nodes[j]->known_public_ips) {
3037                         talloc_free(ctdb->nodes[j]->known_public_ips);
3038                         ctdb->nodes[j]->known_public_ips = NULL;
3039                 }
3040                 if (ctdb->nodes[j]->available_public_ips) {
3041                         talloc_free(ctdb->nodes[j]->available_public_ips);
3042                         ctdb->nodes[j]->available_public_ips = NULL;
3043                 }
3044
3045                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3046                         continue;
3047                 }
3048
3049                 /* grab a new shiny list of public ips from the node */
3050                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
3051                                         CONTROL_TIMEOUT(),
3052                                         ctdb->nodes[j]->pnn,
3053                                         ctdb->nodes,
3054                                         0,
3055                                         &ctdb->nodes[j]->known_public_ips);
3056                 if (ret != 0) {
3057                         DEBUG(DEBUG_ERR,("Failed to read known public ips from node : %u\n",
3058                                 ctdb->nodes[j]->pnn));
3059                         goto again;
3060                 }
3061
3062                 /* grab a new shiny list of public ips from the node */
3063                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
3064                                         CONTROL_TIMEOUT(),
3065                                         ctdb->nodes[j]->pnn,
3066                                         ctdb->nodes,
3067                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
3068                                         &ctdb->nodes[j]->available_public_ips);
3069                 if (ret != 0) {
3070                         DEBUG(DEBUG_ERR,("Failed to read available public ips from node : %u\n",
3071                                 ctdb->nodes[j]->pnn));
3072                         goto again;
3073                 }
3074         }
3075
3076
3077         /* verify that all active nodes agree that we are the recmaster */
3078         switch (verify_recmaster(rec, nodemap, pnn)) {
3079         case MONITOR_RECOVERY_NEEDED:
3080                 /* can not happen */
3081                 goto again;
3082         case MONITOR_ELECTION_NEEDED:
3083                 force_election(rec, pnn, nodemap);
3084                 goto again;
3085         case MONITOR_OK:
3086                 break;
3087         case MONITOR_FAILED:
3088                 goto again;
3089         }
3090
3091
3092         if (rec->need_recovery) {
3093                 /* a previous recovery didn't finish */
3094                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3095                 goto again;             
3096         }
3097
3098         /* verify that all active nodes are in normal mode 
3099            and not in recovery mode 
3100         */
3101         switch (verify_recmode(ctdb, nodemap)) {
3102         case MONITOR_RECOVERY_NEEDED:
3103                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3104                 goto again;
3105         case MONITOR_FAILED:
3106                 goto again;
3107         case MONITOR_ELECTION_NEEDED:
3108                 /* can not happen */
3109         case MONITOR_OK:
3110                 break;
3111         }
3112
3113
3114         if (ctdb->tunable.verify_recovery_lock != 0) {
3115                 /* we should have the reclock - check its not stale */
3116                 ret = check_recovery_lock(ctdb);
3117                 if (ret != 0) {
3118                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3119                         ctdb_set_culprit(rec, ctdb->pnn);
3120                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3121                         goto again;
3122                 }
3123         }
3124
3125         /* get the nodemap for all active remote nodes
3126          */
3127         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3128         if (remote_nodemaps == NULL) {
3129                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3130                 goto again;
3131         }
3132         for(i=0; i<nodemap->num; i++) {
3133                 remote_nodemaps[i] = NULL;
3134         }
3135         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3136                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3137                 goto again;
3138         } 
3139
3140         /* verify that all other nodes have the same nodemap as we have
3141         */
3142         for (j=0; j<nodemap->num; j++) {
3143                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3144                         continue;
3145                 }
3146
3147                 if (remote_nodemaps[j] == NULL) {
3148                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3149                         ctdb_set_culprit(rec, j);
3150
3151                         goto again;
3152                 }
3153
3154                 /* if the nodes disagree on how many nodes there are
3155                    then this is a good reason to try recovery
3156                  */
3157                 if (remote_nodemaps[j]->num != nodemap->num) {
3158                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3159                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3160                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3161                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3162                         goto again;
3163                 }
3164
3165                 /* if the nodes disagree on which nodes exist and are
3166                    active, then that is also a good reason to do recovery
3167                  */
3168                 for (i=0;i<nodemap->num;i++) {
3169                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3170                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3171                                           nodemap->nodes[j].pnn, i, 
3172                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3173                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3174                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3175                                             vnnmap);
3176                                 goto again;
3177                         }
3178                 }
3179
3180                 /* verify the flags are consistent
3181                 */
3182                 for (i=0; i<nodemap->num; i++) {
3183                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3184                                 continue;
3185                         }
3186                         
3187                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3188                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3189                                   nodemap->nodes[j].pnn, 
3190                                   nodemap->nodes[i].pnn, 
3191                                   remote_nodemaps[j]->nodes[i].flags,
3192                                   nodemap->nodes[j].flags));
3193                                 if (i == j) {
3194                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3195                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3196                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3197                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3198                                                     vnnmap);
3199                                         goto again;
3200                                 } else {
3201                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3202                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3203                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3204                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3205                                                     vnnmap);
3206                                         goto again;
3207                                 }
3208                         }
3209                 }
3210         }
3211
3212
3213         /* there better be the same number of lmasters in the vnn map
3214            as there are active nodes or we will have to do a recovery
3215          */
3216         if (vnnmap->size != rec->num_active) {
3217                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active nodes. %u vs %u\n", 
3218                           vnnmap->size, rec->num_active));
3219                 ctdb_set_culprit(rec, ctdb->pnn);
3220                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3221                 goto again;
3222         }
3223
3224         /* verify that all active nodes in the nodemap also exist in 
3225            the vnnmap.
3226          */
3227         for (j=0; j<nodemap->num; j++) {
3228                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3229                         continue;
3230                 }
3231                 if (nodemap->nodes[j].pnn == pnn) {
3232                         continue;
3233                 }
3234
3235                 for (i=0; i<vnnmap->size; i++) {
3236                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3237                                 break;
3238                         }
3239                 }
3240                 if (i == vnnmap->size) {
3241                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3242                                   nodemap->nodes[j].pnn));
3243                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3244                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3245                         goto again;
3246                 }
3247         }
3248
3249         
3250         /* verify that all other nodes have the same vnnmap
3251            and are from the same generation
3252          */
3253         for (j=0; j<nodemap->num; j++) {
3254                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3255                         continue;
3256                 }
3257                 if (nodemap->nodes[j].pnn == pnn) {
3258                         continue;
3259                 }
3260
3261                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3262                                           mem_ctx, &remote_vnnmap);
3263                 if (ret != 0) {
3264                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3265                                   nodemap->nodes[j].pnn));
3266                         goto again;
3267                 }
3268
3269                 /* verify the vnnmap generation is the same */
3270                 if (vnnmap->generation != remote_vnnmap->generation) {
3271                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3272                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3273                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3274                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3275                         goto again;
3276                 }
3277
3278                 /* verify the vnnmap size is the same */
3279                 if (vnnmap->size != remote_vnnmap->size) {
3280                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3281                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3282                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3283                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3284                         goto again;
3285                 }
3286
3287                 /* verify the vnnmap is the same */
3288                 for (i=0;i<vnnmap->size;i++) {
3289                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3290                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3291                                           nodemap->nodes[j].pnn));
3292                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3293                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3294                                             vnnmap);
3295                                 goto again;
3296                         }
3297                 }
3298         }
3299
3300         /* we might need to change who has what IP assigned */
3301         if (rec->need_takeover_run) {
3302                 rec->need_takeover_run = false;
3303
3304                 /* execute the "startrecovery" event script on all nodes */
3305                 ret = run_startrecovery_eventscript(rec, nodemap);
3306                 if (ret!=0) {
3307                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3308                         ctdb_set_culprit(rec, ctdb->pnn);
3309                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3310                         goto again;
3311                 }
3312
3313                 ret = ctdb_takeover_run(ctdb, nodemap);
3314                 if (ret != 0) {
3315                         DEBUG(DEBUG_ERR, (__location__ " Unable to setup public takeover addresses - starting recovery\n"));
3316                         ctdb_set_culprit(rec, ctdb->pnn);
3317                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3318                         goto again;
3319                 }
3320
3321                 /* execute the "recovered" event script on all nodes */
3322                 ret = run_recovered_eventscript(ctdb, nodemap, "monitor_cluster");
3323 #if 0
3324 // we cant check whether the event completed successfully
3325 // since this script WILL fail if the node is in recovery mode
3326 // and if that race happens, the code here would just cause a second
3327 // cascading recovery.
3328                 if (ret!=0) {
3329                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3330                         ctdb_set_culprit(rec, ctdb->pnn);
3331                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3332                 }
3333 #endif
3334         }
3335
3336
3337         goto again;
3338
3339 }
3340
3341 /*
3342   event handler for when the main ctdbd dies
3343  */
3344 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
3345                                  uint16_t flags, void *private_data)
3346 {
3347         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
3348         _exit(1);
3349 }
3350
3351 /*
3352   called regularly to verify that the recovery daemon is still running
3353  */
3354 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
3355                               struct timeval yt, void *p)
3356 {
3357         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
3358
3359         if (kill(ctdb->recoverd_pid, 0) != 0) {
3360                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Shutting down main daemon\n", (int)ctdb->recoverd_pid));
3361
3362                 ctdb_stop_recoverd(ctdb);
3363                 ctdb_stop_keepalive(ctdb);
3364                 ctdb_stop_monitoring(ctdb);
3365                 ctdb_release_all_ips(ctdb);
3366                 if (ctdb->methods != NULL) {
3367                         ctdb->methods->shutdown(ctdb);
3368                 }
3369                 ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
3370
3371                 exit(10);       
3372         }
3373
3374         event_add_timed(ctdb->ev, ctdb, 
3375                         timeval_current_ofs(30, 0),
3376                         ctdb_check_recd, ctdb);
3377 }
3378
3379 static void recd_sig_child_handler(struct event_context *ev,
3380         struct signal_event *se, int signum, int count,
3381         void *dont_care, 
3382         void *private_data)
3383 {
3384 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
3385         int status;
3386         pid_t pid = -1;
3387
3388         while (pid != 0) {
3389                 pid = waitpid(-1, &status, WNOHANG);
3390                 if (pid == -1) {
3391                         if (errno != ECHILD) {
3392                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
3393                         }
3394                         return;
3395                 }
3396                 if (pid > 0) {
3397                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
3398                 }
3399         }
3400 }
3401
3402 /*
3403   startup the recovery daemon as a child of the main ctdb daemon
3404  */
3405 int ctdb_start_recoverd(struct ctdb_context *ctdb)
3406 {
3407         int fd[2];
3408         struct signal_event *se;
3409
3410         if (pipe(fd) != 0) {
3411                 return -1;
3412         }
3413
3414         ctdb->ctdbd_pid = getpid();
3415
3416         ctdb->recoverd_pid = fork();
3417         if (ctdb->recoverd_pid == -1) {
3418                 return -1;
3419         }
3420         
3421         if (ctdb->recoverd_pid != 0) {
3422                 close(fd[0]);
3423                 event_add_timed(ctdb->ev, ctdb, 
3424                                 timeval_current_ofs(30, 0),
3425                                 ctdb_check_recd, ctdb);
3426                 return 0;
3427         }
3428
3429         close(fd[1]);
3430
3431         srandom(getpid() ^ time(NULL));
3432
3433         if (switch_from_server_to_client(ctdb) != 0) {
3434                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
3435                 exit(1);
3436         }
3437
3438         DEBUG(DEBUG_NOTICE, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
3439
3440         event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
3441                      ctdb_recoverd_parent, &fd[0]);     
3442
3443         /* set up a handler to pick up sigchld */
3444         se = event_add_signal(ctdb->ev, ctdb,
3445                                      SIGCHLD, 0,
3446                                      recd_sig_child_handler,
3447                                      ctdb);
3448         if (se == NULL) {
3449                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
3450                 exit(1);
3451         }
3452
3453         monitor_cluster(ctdb);
3454
3455         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
3456         return -1;
3457 }
3458
3459 /*
3460   shutdown the recovery daemon
3461  */
3462 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
3463 {
3464         if (ctdb->recoverd_pid == 0) {
3465                 return;
3466         }
3467
3468         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
3469         kill(ctdb->recoverd_pid, SIGTERM);
3470 }