70ed87e355a4f212ed19b93dd6a8050b58900009
[martins/ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         /* freeze all nodes */
383         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
384         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
385                 int i;
386
387                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
388                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
389                                                 nodes, i,
390                                                 CONTROL_TIMEOUT(),
391                                                 false, tdb_null,
392                                                 NULL,
393                                                 set_recmode_fail_callback,
394                                                 rec) != 0) {
395                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
396                                 talloc_free(tmp_ctx);
397                                 return -1;
398                         }
399                 }
400         }
401
402
403         data.dsize = sizeof(uint32_t);
404         data.dptr = (unsigned char *)&rec_mode;
405
406         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
407                                         nodes, 0,
408                                         CONTROL_TIMEOUT(),
409                                         false, data,
410                                         NULL, NULL,
411                                         NULL) != 0) {
412                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
413                 talloc_free(tmp_ctx);
414                 return -1;
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461         uint32_t *nodes;
462
463         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
464
465         /* step through all local databases */
466         for (db=0; db<dbmap->num;db++) {
467                 TDB_DATA data;
468                 struct ctdb_db_priority db_prio;
469                 int ret;
470
471                 db_prio.db_id     = dbmap->dbs[db].dbid;
472                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
473                 if (ret != 0) {
474                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
475                         continue;
476                 }
477
478                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
479
480                 data.dptr  = (uint8_t *)&db_prio;
481                 data.dsize = sizeof(db_prio);
482
483                 if (ctdb_client_async_control(ctdb,
484                                         CTDB_CONTROL_SET_DB_PRIORITY,
485                                         nodes, 0,
486                                         CONTROL_TIMEOUT(), false, data,
487                                         NULL, NULL,
488                                         NULL) != 0) {
489                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n", db_prio.db_id));
490                 }
491         }
492
493         return 0;
494 }                       
495
496 /*
497   ensure all other nodes have attached to any databases that we have
498  */
499 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
500                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
501 {
502         int i, j, db, ret;
503         struct ctdb_dbid_map *remote_dbmap;
504
505         /* verify that all other nodes have all our databases */
506         for (j=0; j<nodemap->num; j++) {
507                 /* we dont need to ourself ourselves */
508                 if (nodemap->nodes[j].pnn == pnn) {
509                         continue;
510                 }
511                 /* dont check nodes that are unavailable */
512                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
513                         continue;
514                 }
515
516                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
517                                          mem_ctx, &remote_dbmap);
518                 if (ret != 0) {
519                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
520                         return -1;
521                 }
522
523                 /* step through all local databases */
524                 for (db=0; db<dbmap->num;db++) {
525                         const char *name;
526
527
528                         for (i=0;i<remote_dbmap->num;i++) {
529                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
530                                         break;
531                                 }
532                         }
533                         /* the remote node already have this database */
534                         if (i!=remote_dbmap->num) {
535                                 continue;
536                         }
537                         /* ok so we need to create this database */
538                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
539                                                   dbmap->dbs[db].dbid, mem_ctx,
540                                                   &name);
541                         if (ret != 0) {
542                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
543                                 return -1;
544                         }
545                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
546                                                  nodemap->nodes[j].pnn,
547                                                  mem_ctx, name,
548                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
549                         if (ret != 0) {
550                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
551                                 return -1;
552                         }
553                 }
554         }
555
556         return 0;
557 }
558
559
560 /*
561   ensure we are attached to any databases that anyone else is attached to
562  */
563 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
564                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
565 {
566         int i, j, db, ret;
567         struct ctdb_dbid_map *remote_dbmap;
568
569         /* verify that we have all database any other node has */
570         for (j=0; j<nodemap->num; j++) {
571                 /* we dont need to ourself ourselves */
572                 if (nodemap->nodes[j].pnn == pnn) {
573                         continue;
574                 }
575                 /* dont check nodes that are unavailable */
576                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
577                         continue;
578                 }
579
580                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
581                                          mem_ctx, &remote_dbmap);
582                 if (ret != 0) {
583                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
584                         return -1;
585                 }
586
587                 /* step through all databases on the remote node */
588                 for (db=0; db<remote_dbmap->num;db++) {
589                         const char *name;
590
591                         for (i=0;i<(*dbmap)->num;i++) {
592                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
593                                         break;
594                                 }
595                         }
596                         /* we already have this db locally */
597                         if (i!=(*dbmap)->num) {
598                                 continue;
599                         }
600                         /* ok so we need to create this database and
601                            rebuild dbmap
602                          */
603                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
604                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
605                         if (ret != 0) {
606                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
607                                           nodemap->nodes[j].pnn));
608                                 return -1;
609                         }
610                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
611                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
612                         if (ret != 0) {
613                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
614                                 return -1;
615                         }
616                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
617                         if (ret != 0) {
618                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
619                                 return -1;
620                         }
621                 }
622         }
623
624         return 0;
625 }
626
627
628 /*
629   pull the remote database contents from one node into the recdb
630  */
631 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
632                                     struct tdb_wrap *recdb, uint32_t dbid)
633 {
634         int ret;
635         TDB_DATA outdata;
636         struct ctdb_marshall_buffer *reply;
637         struct ctdb_rec_data *rec;
638         int i;
639         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
640
641         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
642                                CONTROL_TIMEOUT(), &outdata);
643         if (ret != 0) {
644                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
645                 talloc_free(tmp_ctx);
646                 return -1;
647         }
648
649         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
650
651         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
652                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
653                 talloc_free(tmp_ctx);
654                 return -1;
655         }
656         
657         rec = (struct ctdb_rec_data *)&reply->data[0];
658         
659         for (i=0;
660              i<reply->count;
661              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
662                 TDB_DATA key, data;
663                 struct ctdb_ltdb_header *hdr;
664                 TDB_DATA existing;
665                 
666                 key.dptr = &rec->data[0];
667                 key.dsize = rec->keylen;
668                 data.dptr = &rec->data[key.dsize];
669                 data.dsize = rec->datalen;
670                 
671                 hdr = (struct ctdb_ltdb_header *)data.dptr;
672
673                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
674                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
675                         talloc_free(tmp_ctx);
676                         return -1;
677                 }
678
679                 /* fetch the existing record, if any */
680                 existing = tdb_fetch(recdb->tdb, key);
681                 
682                 if (existing.dptr != NULL) {
683                         struct ctdb_ltdb_header header;
684                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
685                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
686                                          (unsigned)existing.dsize, srcnode));
687                                 free(existing.dptr);
688                                 talloc_free(tmp_ctx);
689                                 return -1;
690                         }
691                         header = *(struct ctdb_ltdb_header *)existing.dptr;
692                         free(existing.dptr);
693                         if (!(header.rsn < hdr->rsn ||
694                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
695                                 continue;
696                         }
697                 }
698                 
699                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
700                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
701                         talloc_free(tmp_ctx);
702                         return -1;                              
703                 }
704         }
705
706         talloc_free(tmp_ctx);
707
708         return 0;
709 }
710
711
712 struct pull_seqnum_cbdata {
713         int failed;
714         uint32_t pnn;
715         uint64_t seqnum;
716 };
717
718 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
719 {
720         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
721         uint64_t seqnum;
722
723         if (cb_data->failed != 0) {
724                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
725                 return;
726         }
727
728         if (res != 0) {
729                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
730                 cb_data->failed = 1;
731                 return;
732         }
733
734         if (outdata.dsize != sizeof(uint64_t)) {
735                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
736                 cb_data->failed = -1;
737                 return;
738         }
739
740         seqnum = *((uint64_t *)outdata.dptr);
741
742         if (seqnum > cb_data->seqnum ||
743             (cb_data->pnn == -1 && seqnum == 0)) {
744                 cb_data->seqnum = seqnum;
745                 cb_data->pnn = node_pnn;
746         }
747 }
748
749 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
750 {
751         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
752
753         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
754         cb_data->failed = 1;
755 }
756
757 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
758                                 struct ctdb_recoverd *rec, 
759                                 struct ctdb_node_map *nodemap, 
760                                 struct tdb_wrap *recdb, uint32_t dbid)
761 {
762         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
763         uint32_t *nodes;
764         TDB_DATA data;
765         uint32_t outdata[2];
766         struct pull_seqnum_cbdata *cb_data;
767
768         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
769
770         outdata[0] = dbid;
771         outdata[1] = 0;
772
773         data.dsize = sizeof(outdata);
774         data.dptr  = (uint8_t *)&outdata[0];
775
776         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
777         if (cb_data == NULL) {
778                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
779                 talloc_free(tmp_ctx);
780                 return -1;
781         }
782
783         cb_data->failed = 0;
784         cb_data->pnn    = -1;
785         cb_data->seqnum = 0;
786         
787         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
788         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
789                                         nodes, 0,
790                                         CONTROL_TIMEOUT(), false, data,
791                                         pull_seqnum_cb,
792                                         pull_seqnum_fail_cb,
793                                         cb_data) != 0) {
794                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
795
796                 talloc_free(tmp_ctx);
797                 return -1;
798         }
799
800         if (cb_data->failed != 0) {
801                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
802                 talloc_free(tmp_ctx);
803                 return -1;
804         }
805
806         if (cb_data->pnn == -1) {
807                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
808                 talloc_free(tmp_ctx);
809                 return -1;
810         }
811
812         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
813
814         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
815                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
816                 talloc_free(tmp_ctx);
817                 return -1;
818         }
819
820         talloc_free(tmp_ctx);
821         return 0;
822 }
823
824
825 /*
826   pull all the remote database contents into the recdb
827  */
828 static int pull_remote_database(struct ctdb_context *ctdb,
829                                 struct ctdb_recoverd *rec, 
830                                 struct ctdb_node_map *nodemap, 
831                                 struct tdb_wrap *recdb, uint32_t dbid,
832                                 bool persistent)
833 {
834         int j;
835
836         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
837                 int ret;
838                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
839                 if (ret == 0) {
840                         return 0;
841                 }
842         }
843
844         /* pull all records from all other nodes across onto this node
845            (this merges based on rsn)
846         */
847         for (j=0; j<nodemap->num; j++) {
848                 /* dont merge from nodes that are unavailable */
849                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
850                         continue;
851                 }
852                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
853                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
854                                  nodemap->nodes[j].pnn));
855                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
856                         return -1;
857                 }
858         }
859         
860         return 0;
861 }
862
863
864 /*
865   update flags on all active nodes
866  */
867 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
868 {
869         int ret;
870
871         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
872                 if (ret != 0) {
873                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
874                 return -1;
875         }
876
877         return 0;
878 }
879
880 /*
881   ensure all nodes have the same vnnmap we do
882  */
883 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
884                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
885 {
886         int j, ret;
887
888         /* push the new vnn map out to all the nodes */
889         for (j=0; j<nodemap->num; j++) {
890                 /* dont push to nodes that are unavailable */
891                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
892                         continue;
893                 }
894
895                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
896                 if (ret != 0) {
897                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
898                         return -1;
899                 }
900         }
901
902         return 0;
903 }
904
905
906 struct vacuum_info {
907         struct vacuum_info *next, *prev;
908         struct ctdb_recoverd *rec;
909         uint32_t srcnode;
910         struct ctdb_db_context *ctdb_db;
911         struct ctdb_marshall_buffer *recs;
912         struct ctdb_rec_data *r;
913 };
914
915 static void vacuum_fetch_next(struct vacuum_info *v);
916
917 /*
918   called when a vacuum fetch has completed - just free it and do the next one
919  */
920 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
921 {
922         struct vacuum_info *v = talloc_get_type(state->async.private_data, struct vacuum_info);
923         talloc_free(state);
924         vacuum_fetch_next(v);
925 }
926
927
928 /*
929   process the next element from the vacuum list
930 */
931 static void vacuum_fetch_next(struct vacuum_info *v)
932 {
933         struct ctdb_call call;
934         struct ctdb_rec_data *r;
935
936         while (v->recs->count) {
937                 struct ctdb_client_call_state *state;
938                 TDB_DATA data;
939                 struct ctdb_ltdb_header *hdr;
940
941                 ZERO_STRUCT(call);
942                 call.call_id = CTDB_NULL_FUNC;
943                 call.flags = CTDB_IMMEDIATE_MIGRATION;
944                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
945
946                 r = v->r;
947                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
948                 v->recs->count--;
949
950                 call.key.dptr = &r->data[0];
951                 call.key.dsize = r->keylen;
952
953                 /* ensure we don't block this daemon - just skip a record if we can't get
954                    the chainlock */
955                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
956                         continue;
957                 }
958
959                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
960                 if (data.dptr == NULL) {
961                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
962                         continue;
963                 }
964
965                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
966                         free(data.dptr);
967                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
968                         continue;
969                 }
970                 
971                 hdr = (struct ctdb_ltdb_header *)data.dptr;
972                 if (hdr->dmaster == v->rec->ctdb->pnn) {
973                         /* its already local */
974                         free(data.dptr);
975                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
976                         continue;
977                 }
978
979                 free(data.dptr);
980
981                 state = ctdb_call_send(v->ctdb_db, &call);
982                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
983                 if (state == NULL) {
984                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
985                         talloc_free(v);
986                         return;
987                 }
988                 state->async.fn = vacuum_fetch_callback;
989                 state->async.private_data = v;
990                 return;
991         }
992
993         talloc_free(v);
994 }
995
996
997 /*
998   destroy a vacuum info structure
999  */
1000 static int vacuum_info_destructor(struct vacuum_info *v)
1001 {
1002         DLIST_REMOVE(v->rec->vacuum_info, v);
1003         return 0;
1004 }
1005
1006
1007 /*
1008   handler for vacuum fetch
1009 */
1010 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
1011                                  TDB_DATA data, void *private_data)
1012 {
1013         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1014         struct ctdb_marshall_buffer *recs;
1015         int ret, i;
1016         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1017         const char *name;
1018         struct ctdb_dbid_map *dbmap=NULL;
1019         bool persistent = false;
1020         struct ctdb_db_context *ctdb_db;
1021         struct ctdb_rec_data *r;
1022         uint32_t srcnode;
1023         struct vacuum_info *v;
1024
1025         recs = (struct ctdb_marshall_buffer *)data.dptr;
1026         r = (struct ctdb_rec_data *)&recs->data[0];
1027
1028         if (recs->count == 0) {
1029                 talloc_free(tmp_ctx);
1030                 return;
1031         }
1032
1033         srcnode = r->reqid;
1034
1035         for (v=rec->vacuum_info;v;v=v->next) {
1036                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1037                         /* we're already working on records from this node */
1038                         talloc_free(tmp_ctx);
1039                         return;
1040                 }
1041         }
1042
1043         /* work out if the database is persistent */
1044         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1045         if (ret != 0) {
1046                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1047                 talloc_free(tmp_ctx);
1048                 return;
1049         }
1050
1051         for (i=0;i<dbmap->num;i++) {
1052                 if (dbmap->dbs[i].dbid == recs->db_id) {
1053                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1054                         break;
1055                 }
1056         }
1057         if (i == dbmap->num) {
1058                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1059                 talloc_free(tmp_ctx);
1060                 return;         
1061         }
1062
1063         /* find the name of this database */
1064         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1065                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1066                 talloc_free(tmp_ctx);
1067                 return;
1068         }
1069
1070         /* attach to it */
1071         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1072         if (ctdb_db == NULL) {
1073                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1074                 talloc_free(tmp_ctx);
1075                 return;
1076         }
1077
1078         v = talloc_zero(rec, struct vacuum_info);
1079         if (v == NULL) {
1080                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1081                 talloc_free(tmp_ctx);
1082                 return;
1083         }
1084
1085         v->rec = rec;
1086         v->srcnode = srcnode;
1087         v->ctdb_db = ctdb_db;
1088         v->recs = talloc_memdup(v, recs, data.dsize);
1089         if (v->recs == NULL) {
1090                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1091                 talloc_free(v);
1092                 talloc_free(tmp_ctx);
1093                 return;         
1094         }
1095         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1096
1097         DLIST_ADD(rec->vacuum_info, v);
1098
1099         talloc_set_destructor(v, vacuum_info_destructor);
1100
1101         vacuum_fetch_next(v);
1102         talloc_free(tmp_ctx);
1103 }
1104
1105
1106 /*
1107   called when ctdb_wait_timeout should finish
1108  */
1109 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1110                               struct timeval yt, void *p)
1111 {
1112         uint32_t *timed_out = (uint32_t *)p;
1113         (*timed_out) = 1;
1114 }
1115
1116 /*
1117   wait for a given number of seconds
1118  */
1119 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1120 {
1121         uint32_t timed_out = 0;
1122         time_t usecs = (secs - (time_t)secs) * 1000000;
1123         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1124         while (!timed_out) {
1125                 event_loop_once(ctdb->ev);
1126         }
1127 }
1128
1129 /*
1130   called when an election times out (ends)
1131  */
1132 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1133                                   struct timeval t, void *p)
1134 {
1135         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1136         rec->election_timeout = NULL;
1137         fast_start = false;
1138
1139         DEBUG(DEBUG_WARNING,(__location__ " Election timed out\n"));
1140 }
1141
1142
1143 /*
1144   wait for an election to finish. It finished election_timeout seconds after
1145   the last election packet is received
1146  */
1147 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1148 {
1149         struct ctdb_context *ctdb = rec->ctdb;
1150         while (rec->election_timeout) {
1151                 event_loop_once(ctdb->ev);
1152         }
1153 }
1154
1155 /*
1156   Update our local flags from all remote connected nodes. 
1157   This is only run when we are or we belive we are the recovery master
1158  */
1159 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1160 {
1161         int j;
1162         struct ctdb_context *ctdb = rec->ctdb;
1163         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1164
1165         /* get the nodemap for all active remote nodes and verify
1166            they are the same as for this node
1167          */
1168         for (j=0; j<nodemap->num; j++) {
1169                 struct ctdb_node_map *remote_nodemap=NULL;
1170                 int ret;
1171
1172                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1173                         continue;
1174                 }
1175                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1176                         continue;
1177                 }
1178
1179                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1180                                            mem_ctx, &remote_nodemap);
1181                 if (ret != 0) {
1182                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1183                                   nodemap->nodes[j].pnn));
1184                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1185                         talloc_free(mem_ctx);
1186                         return MONITOR_FAILED;
1187                 }
1188                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1189                         /* We should tell our daemon about this so it
1190                            updates its flags or else we will log the same 
1191                            message again in the next iteration of recovery.
1192                            Since we are the recovery master we can just as
1193                            well update the flags on all nodes.
1194                         */
1195                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1196                         if (ret != 0) {
1197                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1198                                 return -1;
1199                         }
1200
1201                         /* Update our local copy of the flags in the recovery
1202                            daemon.
1203                         */
1204                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1205                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1206                                  nodemap->nodes[j].flags));
1207                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1208                 }
1209                 talloc_free(remote_nodemap);
1210         }
1211         talloc_free(mem_ctx);
1212         return MONITOR_OK;
1213 }
1214
1215
1216 /* Create a new random generation ip. 
1217    The generation id can not be the INVALID_GENERATION id
1218 */
1219 static uint32_t new_generation(void)
1220 {
1221         uint32_t generation;
1222
1223         while (1) {
1224                 generation = random();
1225
1226                 if (generation != INVALID_GENERATION) {
1227                         break;
1228                 }
1229         }
1230
1231         return generation;
1232 }
1233
1234
1235 /*
1236   create a temporary working database
1237  */
1238 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1239 {
1240         char *name;
1241         struct tdb_wrap *recdb;
1242         unsigned tdb_flags;
1243
1244         /* open up the temporary recovery database */
1245         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1246                                ctdb->db_directory_state,
1247                                ctdb->pnn);
1248         if (name == NULL) {
1249                 return NULL;
1250         }
1251         unlink(name);
1252
1253         tdb_flags = TDB_NOLOCK;
1254         if (ctdb->valgrinding) {
1255                 tdb_flags |= TDB_NOMMAP;
1256         }
1257         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1258
1259         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1260                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1261         if (recdb == NULL) {
1262                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1263         }
1264
1265         talloc_free(name);
1266
1267         return recdb;
1268 }
1269
1270
1271 /* 
1272    a traverse function for pulling all relevant records from recdb
1273  */
1274 struct recdb_data {
1275         struct ctdb_context *ctdb;
1276         struct ctdb_marshall_buffer *recdata;
1277         uint32_t len;
1278         uint32_t allocated_len;
1279         bool failed;
1280         bool persistent;
1281 };
1282
1283 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1284 {
1285         struct recdb_data *params = (struct recdb_data *)p;
1286         struct ctdb_rec_data *rec;
1287         struct ctdb_ltdb_header *hdr;
1288
1289         /*
1290          * skip empty records - but NOT for persistent databases:
1291          *
1292          * The record-by-record mode of recovery deletes empty records.
1293          * For persistent databases, this can lead to data corruption
1294          * by deleting records that should be there:
1295          *
1296          * - Assume the cluster has been running for a while.
1297          *
1298          * - A record R in a persistent database has been created and
1299          *   deleted a couple of times, the last operation being deletion,
1300          *   leaving an empty record with a high RSN, say 10.
1301          *
1302          * - Now a node N is turned off.
1303          *
1304          * - This leaves the local database copy of D on N with the empty
1305          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1306          *   the copy of record R.
1307          *
1308          * - Now the record is created again while node N is turned off.
1309          *   This creates R with RSN = 1 on all nodes except for N.
1310          *
1311          * - Now node N is turned on again. The following recovery will chose
1312          *   the older empty copy of R due to RSN 10 > RSN 1.
1313          *
1314          * ==> Hence the record is gone after the recovery.
1315          *
1316          * On databases like Samba's registry, this can damage the higher-level
1317          * data structures built from the various tdb-level records.
1318          */
1319         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1320                 return 0;
1321         }
1322
1323         /* update the dmaster field to point to us */
1324         hdr = (struct ctdb_ltdb_header *)data.dptr;
1325         if (!params->persistent) {
1326                 hdr->dmaster = params->ctdb->pnn;
1327                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1328         }
1329
1330         /* add the record to the blob ready to send to the nodes */
1331         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1332         if (rec == NULL) {
1333                 params->failed = true;
1334                 return -1;
1335         }
1336         if (params->len + rec->length >= params->allocated_len) {
1337                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1338                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1339         }
1340         if (params->recdata == NULL) {
1341                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1342                          rec->length + params->len));
1343                 params->failed = true;
1344                 return -1;
1345         }
1346         params->recdata->count++;
1347         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1348         params->len += rec->length;
1349         talloc_free(rec);
1350
1351         return 0;
1352 }
1353
1354 /*
1355   push the recdb database out to all nodes
1356  */
1357 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1358                                bool persistent,
1359                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1360 {
1361         struct recdb_data params;
1362         struct ctdb_marshall_buffer *recdata;
1363         TDB_DATA outdata;
1364         TALLOC_CTX *tmp_ctx;
1365         uint32_t *nodes;
1366
1367         tmp_ctx = talloc_new(ctdb);
1368         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1369
1370         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1371         CTDB_NO_MEMORY(ctdb, recdata);
1372
1373         recdata->db_id = dbid;
1374
1375         params.ctdb = ctdb;
1376         params.recdata = recdata;
1377         params.len = offsetof(struct ctdb_marshall_buffer, data);
1378         params.allocated_len = params.len;
1379         params.failed = false;
1380         params.persistent = persistent;
1381
1382         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1383                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1384                 talloc_free(params.recdata);
1385                 talloc_free(tmp_ctx);
1386                 return -1;
1387         }
1388
1389         if (params.failed) {
1390                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1391                 talloc_free(params.recdata);
1392                 talloc_free(tmp_ctx);
1393                 return -1;              
1394         }
1395
1396         recdata = params.recdata;
1397
1398         outdata.dptr = (void *)recdata;
1399         outdata.dsize = params.len;
1400
1401         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1402         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1403                                         nodes, 0,
1404                                         CONTROL_TIMEOUT(), false, outdata,
1405                                         NULL, NULL,
1406                                         NULL) != 0) {
1407                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1408                 talloc_free(recdata);
1409                 talloc_free(tmp_ctx);
1410                 return -1;
1411         }
1412
1413         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1414                   dbid, recdata->count));
1415
1416         talloc_free(recdata);
1417         talloc_free(tmp_ctx);
1418
1419         return 0;
1420 }
1421
1422
1423 /*
1424   go through a full recovery on one database 
1425  */
1426 static int recover_database(struct ctdb_recoverd *rec, 
1427                             TALLOC_CTX *mem_ctx,
1428                             uint32_t dbid,
1429                             bool persistent,
1430                             uint32_t pnn, 
1431                             struct ctdb_node_map *nodemap,
1432                             uint32_t transaction_id)
1433 {
1434         struct tdb_wrap *recdb;
1435         int ret;
1436         struct ctdb_context *ctdb = rec->ctdb;
1437         TDB_DATA data;
1438         struct ctdb_control_wipe_database w;
1439         uint32_t *nodes;
1440
1441         recdb = create_recdb(ctdb, mem_ctx);
1442         if (recdb == NULL) {
1443                 return -1;
1444         }
1445
1446         /* pull all remote databases onto the recdb */
1447         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1448         if (ret != 0) {
1449                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1450                 return -1;
1451         }
1452
1453         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1454
1455         /* wipe all the remote databases. This is safe as we are in a transaction */
1456         w.db_id = dbid;
1457         w.transaction_id = transaction_id;
1458
1459         data.dptr = (void *)&w;
1460         data.dsize = sizeof(w);
1461
1462         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1463         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1464                                         nodes, 0,
1465                                         CONTROL_TIMEOUT(), false, data,
1466                                         NULL, NULL,
1467                                         NULL) != 0) {
1468                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1469                 talloc_free(recdb);
1470                 return -1;
1471         }
1472         
1473         /* push out the correct database. This sets the dmaster and skips 
1474            the empty records */
1475         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1476         if (ret != 0) {
1477                 talloc_free(recdb);
1478                 return -1;
1479         }
1480
1481         /* all done with this database */
1482         talloc_free(recdb);
1483
1484         return 0;
1485 }
1486
1487 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1488                                          struct ctdb_recoverd *rec,
1489                                          struct ctdb_node_map *nodemap,
1490                                          uint32_t *culprit)
1491 {
1492         int j;
1493         int ret;
1494
1495         if (ctdb->num_nodes != nodemap->num) {
1496                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1497                                   ctdb->num_nodes, nodemap->num));
1498                 if (culprit) {
1499                         *culprit = ctdb->pnn;
1500                 }
1501                 return -1;
1502         }
1503
1504         for (j=0; j<nodemap->num; j++) {
1505                 /* For readability */
1506                 struct ctdb_node *node = ctdb->nodes[j];
1507
1508                 /* release any existing data */
1509                 if (node->known_public_ips) {
1510                         talloc_free(node->known_public_ips);
1511                         node->known_public_ips = NULL;
1512                 }
1513                 if (node->available_public_ips) {
1514                         talloc_free(node->available_public_ips);
1515                         node->available_public_ips = NULL;
1516                 }
1517
1518                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1519                         continue;
1520                 }
1521
1522                 /* Retrieve the list of known public IPs from the node */
1523                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1524                                         CONTROL_TIMEOUT(),
1525                                         node->pnn,
1526                                         ctdb->nodes,
1527                                         0,
1528                                         &node->known_public_ips);
1529                 if (ret != 0) {
1530                         DEBUG(DEBUG_ERR,
1531                               ("Failed to read known public IPs from node: %u\n",
1532                                node->pnn));
1533                         if (culprit) {
1534                                 *culprit = node->pnn;
1535                         }
1536                         return -1;
1537                 }
1538
1539                 if (ctdb->do_checkpublicip &&
1540                     rec->takeover_runs_disable_ctx == NULL &&
1541                     verify_remote_ip_allocation(ctdb,
1542                                                  node->known_public_ips,
1543                                                  node->pnn)) {
1544                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1545                         rec->need_takeover_run = true;
1546                 }
1547
1548                 /* Retrieve the list of available public IPs from the node */
1549                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1550                                         CONTROL_TIMEOUT(),
1551                                         node->pnn,
1552                                         ctdb->nodes,
1553                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1554                                         &node->available_public_ips);
1555                 if (ret != 0) {
1556                         DEBUG(DEBUG_ERR,
1557                               ("Failed to read available public IPs from node: %u\n",
1558                                node->pnn));
1559                         if (culprit) {
1560                                 *culprit = node->pnn;
1561                         }
1562                         return -1;
1563                 }
1564         }
1565
1566         return 0;
1567 }
1568
1569 /* when we start a recovery, make sure all nodes use the same reclock file
1570    setting
1571 */
1572 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1573 {
1574         struct ctdb_context *ctdb = rec->ctdb;
1575         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1576         TDB_DATA data;
1577         uint32_t *nodes;
1578
1579         if (ctdb->recovery_lock_file == NULL) {
1580                 data.dptr  = NULL;
1581                 data.dsize = 0;
1582         } else {
1583                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1584                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1585         }
1586
1587         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1588         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1589                                         nodes, 0,
1590                                         CONTROL_TIMEOUT(),
1591                                         false, data,
1592                                         NULL, NULL,
1593                                         rec) != 0) {
1594                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1595                 talloc_free(tmp_ctx);
1596                 return -1;
1597         }
1598
1599         talloc_free(tmp_ctx);
1600         return 0;
1601 }
1602
1603
1604 /*
1605  * this callback is called for every node that failed to execute ctdb_takeover_run()
1606  * and set flag to re-run takeover run.
1607  */
1608 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1609 {
1610         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1611
1612         if (callback_data != NULL) {
1613                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1614
1615                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1616
1617                 ctdb_set_culprit(rec, node_pnn);
1618         }
1619 }
1620
1621
1622 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1623 {
1624         struct ctdb_context *ctdb = rec->ctdb;
1625         int i;
1626         struct ctdb_banning_state *ban_state;
1627
1628         *self_ban = false;
1629         for (i=0; i<ctdb->num_nodes; i++) {
1630                 if (ctdb->nodes[i]->ban_state == NULL) {
1631                         continue;
1632                 }
1633                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1634                 if (ban_state->count < 2*ctdb->num_nodes) {
1635                         continue;
1636                 }
1637
1638                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1639                         ctdb->nodes[i]->pnn, ban_state->count,
1640                         ctdb->tunable.recovery_ban_period));
1641                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1642                 ban_state->count = 0;
1643
1644                 /* Banning ourself? */
1645                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1646                         *self_ban = true;
1647                 }
1648         }
1649 }
1650
1651 static bool do_takeover_run(struct ctdb_recoverd *rec,
1652                             struct ctdb_node_map *nodemap,
1653                             bool banning_credits_on_fail)
1654 {
1655         uint32_t *nodes = NULL;
1656         struct srvid_request_data dtr;
1657         TDB_DATA data;
1658         int i;
1659         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1660         int ret;
1661         bool ok;
1662
1663         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1664
1665         if (rec->takeover_run_in_progress) {
1666                 DEBUG(DEBUG_ERR, (__location__
1667                                   " takeover run already in progress \n"));
1668                 ok = false;
1669                 goto done;
1670         }
1671
1672         rec->takeover_run_in_progress = true;
1673
1674         /* If takeover runs are in disabled then fail... */
1675         if (rec->takeover_runs_disable_ctx != NULL) {
1676                 DEBUG(DEBUG_ERR,
1677                       ("Takeover runs are disabled so refusing to run one\n"));
1678                 ok = false;
1679                 goto done;
1680         }
1681
1682         /* Disable IP checks (takeover runs, really) on other nodes
1683          * while doing this takeover run.  This will stop those other
1684          * nodes from triggering takeover runs when think they should
1685          * be hosting an IP but it isn't yet on an interface.  Don't
1686          * wait for replies since a failure here might cause some
1687          * noise in the logs but will not actually cause a problem.
1688          */
1689         dtr.srvid = 0; /* No reply */
1690         dtr.pnn = -1;
1691
1692         data.dptr  = (uint8_t*)&dtr;
1693         data.dsize = sizeof(dtr);
1694
1695         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1696
1697         /* Disable for 60 seconds.  This can be a tunable later if
1698          * necessary.
1699          */
1700         dtr.data = 60;
1701         for (i = 0; i < talloc_array_length(nodes); i++) {
1702                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1703                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1704                                              data) != 0) {
1705                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1706                 }
1707         }
1708
1709         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1710                                 rec->force_rebalance_nodes,
1711                                 takeover_fail_callback,
1712                                 banning_credits_on_fail ? rec : NULL);
1713
1714         /* Reenable takeover runs and IP checks on other nodes */
1715         dtr.data = 0;
1716         for (i = 0; i < talloc_array_length(nodes); i++) {
1717                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1718                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1719                                              data) != 0) {
1720                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1721                 }
1722         }
1723
1724         if (ret != 0) {
1725                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1726                 ok = false;
1727                 goto done;
1728         }
1729
1730         ok = true;
1731         /* Takeover run was successful so clear force rebalance targets */
1732         if (rebalance_nodes == rec->force_rebalance_nodes) {
1733                 TALLOC_FREE(rec->force_rebalance_nodes);
1734         } else {
1735                 DEBUG(DEBUG_WARNING,
1736                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1737         }
1738 done:
1739         rec->need_takeover_run = !ok;
1740         talloc_free(nodes);
1741         rec->takeover_run_in_progress = false;
1742
1743         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1744         return ok;
1745 }
1746
1747
1748 /*
1749   we are the recmaster, and recovery is needed - start a recovery run
1750  */
1751 static int do_recovery(struct ctdb_recoverd *rec, 
1752                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1753                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1754 {
1755         struct ctdb_context *ctdb = rec->ctdb;
1756         int i, j, ret;
1757         uint32_t generation;
1758         struct ctdb_dbid_map *dbmap;
1759         TDB_DATA data;
1760         uint32_t *nodes;
1761         struct timeval start_time;
1762         uint32_t culprit = (uint32_t)-1;
1763         bool self_ban;
1764
1765         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1766
1767         /* if recovery fails, force it again */
1768         rec->need_recovery = true;
1769
1770         ban_misbehaving_nodes(rec, &self_ban);
1771         if (self_ban) {
1772                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1773                 return -1;
1774         }
1775
1776         if (ctdb->tunable.verify_recovery_lock != 0) {
1777                 DEBUG(DEBUG_ERR,("Taking out recovery lock from recovery daemon\n"));
1778                 start_time = timeval_current();
1779                 if (!ctdb_recovery_lock(ctdb, true)) {
1780                         DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1781                                          "and ban ourself for %u seconds\n",
1782                                          ctdb->tunable.recovery_ban_period));
1783                         ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1784                         return -1;
1785                 }
1786                 ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&start_time));
1787                 DEBUG(DEBUG_NOTICE,("Recovery lock taken successfully by recovery daemon\n"));
1788         }
1789
1790         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1791
1792         /* get a list of all databases */
1793         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1794         if (ret != 0) {
1795                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1796                 return -1;
1797         }
1798
1799         /* we do the db creation before we set the recovery mode, so the freeze happens
1800            on all databases we will be dealing with. */
1801
1802         /* verify that we have all the databases any other node has */
1803         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1804         if (ret != 0) {
1805                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1806                 return -1;
1807         }
1808
1809         /* verify that all other nodes have all our databases */
1810         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1811         if (ret != 0) {
1812                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1813                 return -1;
1814         }
1815         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1816
1817         /* update the database priority for all remote databases */
1818         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1819         if (ret != 0) {
1820                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1821         }
1822         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1823
1824
1825         /* update all other nodes to use the same setting for reclock files
1826            as the local recovery master.
1827         */
1828         sync_recovery_lock_file_across_cluster(rec);
1829
1830         /* set recovery mode to active on all nodes */
1831         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1832         if (ret != 0) {
1833                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1834                 return -1;
1835         }
1836
1837         /* execute the "startrecovery" event script on all nodes */
1838         ret = run_startrecovery_eventscript(rec, nodemap);
1839         if (ret!=0) {
1840                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1841                 return -1;
1842         }
1843
1844         /*
1845           update all nodes to have the same flags that we have
1846          */
1847         for (i=0;i<nodemap->num;i++) {
1848                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1849                         continue;
1850                 }
1851
1852                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1853                 if (ret != 0) {
1854                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1855                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1856                         } else {
1857                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1858                                 return -1;
1859                         }
1860                 }
1861         }
1862
1863         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1864
1865         /* pick a new generation number */
1866         generation = new_generation();
1867
1868         /* change the vnnmap on this node to use the new generation 
1869            number but not on any other nodes.
1870            this guarantees that if we abort the recovery prematurely
1871            for some reason (a node stops responding?)
1872            that we can just return immediately and we will reenter
1873            recovery shortly again.
1874            I.e. we deliberately leave the cluster with an inconsistent
1875            generation id to allow us to abort recovery at any stage and
1876            just restart it from scratch.
1877          */
1878         vnnmap->generation = generation;
1879         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1880         if (ret != 0) {
1881                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1882                 return -1;
1883         }
1884
1885         data.dptr = (void *)&generation;
1886         data.dsize = sizeof(uint32_t);
1887
1888         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1889         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1890                                         nodes, 0,
1891                                         CONTROL_TIMEOUT(), false, data,
1892                                         NULL,
1893                                         transaction_start_fail_callback,
1894                                         rec) != 0) {
1895                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1896                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1897                                         nodes, 0,
1898                                         CONTROL_TIMEOUT(), false, tdb_null,
1899                                         NULL,
1900                                         NULL,
1901                                         NULL) != 0) {
1902                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1903                 }
1904                 return -1;
1905         }
1906
1907         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1908
1909         for (i=0;i<dbmap->num;i++) {
1910                 ret = recover_database(rec, mem_ctx,
1911                                        dbmap->dbs[i].dbid,
1912                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1913                                        pnn, nodemap, generation);
1914                 if (ret != 0) {
1915                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1916                         return -1;
1917                 }
1918         }
1919
1920         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1921
1922         /* commit all the changes */
1923         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1924                                         nodes, 0,
1925                                         CONTROL_TIMEOUT(), false, data,
1926                                         NULL, NULL,
1927                                         NULL) != 0) {
1928                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1929                 return -1;
1930         }
1931
1932         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1933         
1934
1935         /* update the capabilities for all nodes */
1936         ret = update_capabilities(ctdb, nodemap);
1937         if (ret!=0) {
1938                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1939                 return -1;
1940         }
1941
1942         /* build a new vnn map with all the currently active and
1943            unbanned nodes */
1944         generation = new_generation();
1945         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1946         CTDB_NO_MEMORY(ctdb, vnnmap);
1947         vnnmap->generation = generation;
1948         vnnmap->size = 0;
1949         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
1950         CTDB_NO_MEMORY(ctdb, vnnmap->map);
1951         for (i=j=0;i<nodemap->num;i++) {
1952                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1953                         continue;
1954                 }
1955                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
1956                         /* this node can not be an lmaster */
1957                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
1958                         continue;
1959                 }
1960
1961                 vnnmap->size++;
1962                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1963                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1964                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
1965
1966         }
1967         if (vnnmap->size == 0) {
1968                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
1969                 vnnmap->size++;
1970                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
1971                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
1972                 vnnmap->map[0] = pnn;
1973         }       
1974
1975         /* update to the new vnnmap on all nodes */
1976         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
1977         if (ret != 0) {
1978                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
1979                 return -1;
1980         }
1981
1982         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
1983
1984         /* update recmaster to point to us for all nodes */
1985         ret = set_recovery_master(ctdb, nodemap, pnn);
1986         if (ret!=0) {
1987                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
1988                 return -1;
1989         }
1990
1991         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
1992
1993         /* disable recovery mode */
1994         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
1995         if (ret != 0) {
1996                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
1997                 return -1;
1998         }
1999
2000         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2001
2002         /* Fetch known/available public IPs from each active node */
2003         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2004         if (ret != 0) {
2005                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2006                                  culprit));
2007                 rec->need_takeover_run = true;
2008                 return -1;
2009         }
2010
2011         do_takeover_run(rec, nodemap, false);
2012
2013         /* execute the "recovered" event script on all nodes */
2014         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2015         if (ret!=0) {
2016                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2017                 return -1;
2018         }
2019
2020         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2021
2022         /* send a message to all clients telling them that the cluster 
2023            has been reconfigured */
2024         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2025                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2026         if (ret != 0) {
2027                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2028                 return -1;
2029         }
2030
2031         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2032
2033         rec->need_recovery = false;
2034
2035         /* we managed to complete a full recovery, make sure to forgive
2036            any past sins by the nodes that could now participate in the
2037            recovery.
2038         */
2039         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2040         for (i=0;i<nodemap->num;i++) {
2041                 struct ctdb_banning_state *ban_state;
2042
2043                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2044                         continue;
2045                 }
2046
2047                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2048                 if (ban_state == NULL) {
2049                         continue;
2050                 }
2051
2052                 ban_state->count = 0;
2053         }
2054
2055
2056         /* We just finished a recovery successfully. 
2057            We now wait for rerecovery_timeout before we allow 
2058            another recovery to take place.
2059         */
2060         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2061         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2062         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2063
2064         return 0;
2065 }
2066
2067
2068 /*
2069   elections are won by first checking the number of connected nodes, then
2070   the priority time, then the pnn
2071  */
2072 struct election_message {
2073         uint32_t num_connected;
2074         struct timeval priority_time;
2075         uint32_t pnn;
2076         uint32_t node_flags;
2077 };
2078
2079 /*
2080   form this nodes election data
2081  */
2082 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2083 {
2084         int ret, i;
2085         struct ctdb_node_map *nodemap;
2086         struct ctdb_context *ctdb = rec->ctdb;
2087
2088         ZERO_STRUCTP(em);
2089
2090         em->pnn = rec->ctdb->pnn;
2091         em->priority_time = rec->priority_time;
2092
2093         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2094         if (ret != 0) {
2095                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2096                 return;
2097         }
2098
2099         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2100         em->node_flags = rec->node_flags;
2101
2102         for (i=0;i<nodemap->num;i++) {
2103                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2104                         em->num_connected++;
2105                 }
2106         }
2107
2108         /* we shouldnt try to win this election if we cant be a recmaster */
2109         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2110                 em->num_connected = 0;
2111                 em->priority_time = timeval_current();
2112         }
2113
2114         talloc_free(nodemap);
2115 }
2116
2117 /*
2118   see if the given election data wins
2119  */
2120 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2121 {
2122         struct election_message myem;
2123         int cmp = 0;
2124
2125         ctdb_election_data(rec, &myem);
2126
2127         /* we cant win if we dont have the recmaster capability */
2128         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2129                 return false;
2130         }
2131
2132         /* we cant win if we are banned */
2133         if (rec->node_flags & NODE_FLAGS_BANNED) {
2134                 return false;
2135         }
2136
2137         /* we cant win if we are stopped */
2138         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2139                 return false;
2140         }
2141
2142         /* we will automatically win if the other node is banned */
2143         if (em->node_flags & NODE_FLAGS_BANNED) {
2144                 return true;
2145         }
2146
2147         /* we will automatically win if the other node is banned */
2148         if (em->node_flags & NODE_FLAGS_STOPPED) {
2149                 return true;
2150         }
2151
2152         /* try to use the most connected node */
2153         if (cmp == 0) {
2154                 cmp = (int)myem.num_connected - (int)em->num_connected;
2155         }
2156
2157         /* then the longest running node */
2158         if (cmp == 0) {
2159                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2160         }
2161
2162         if (cmp == 0) {
2163                 cmp = (int)myem.pnn - (int)em->pnn;
2164         }
2165
2166         return cmp > 0;
2167 }
2168
2169 /*
2170   send out an election request
2171  */
2172 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2173 {
2174         int ret;
2175         TDB_DATA election_data;
2176         struct election_message emsg;
2177         uint64_t srvid;
2178         struct ctdb_context *ctdb = rec->ctdb;
2179
2180         srvid = CTDB_SRVID_RECOVERY;
2181
2182         ctdb_election_data(rec, &emsg);
2183
2184         election_data.dsize = sizeof(struct election_message);
2185         election_data.dptr  = (unsigned char *)&emsg;
2186
2187
2188         /* first we assume we will win the election and set 
2189            recoverymaster to be ourself on the current node
2190          */
2191         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2192         if (ret != 0) {
2193                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2194                 return -1;
2195         }
2196
2197
2198         /* send an election message to all active nodes */
2199         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2200         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2201 }
2202
2203 /*
2204   this function will unban all nodes in the cluster
2205 */
2206 static void unban_all_nodes(struct ctdb_context *ctdb)
2207 {
2208         int ret, i;
2209         struct ctdb_node_map *nodemap;
2210         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2211         
2212         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2213         if (ret != 0) {
2214                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2215                 return;
2216         }
2217
2218         for (i=0;i<nodemap->num;i++) {
2219                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2220                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2221                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2222                                                  nodemap->nodes[i].pnn, 0,
2223                                                  NODE_FLAGS_BANNED);
2224                         if (ret != 0) {
2225                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2226                         }
2227                 }
2228         }
2229
2230         talloc_free(tmp_ctx);
2231 }
2232
2233
2234 /*
2235   we think we are winning the election - send a broadcast election request
2236  */
2237 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2238 {
2239         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2240         int ret;
2241
2242         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2243         if (ret != 0) {
2244                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2245         }
2246
2247         talloc_free(rec->send_election_te);
2248         rec->send_election_te = NULL;
2249 }
2250
2251 /*
2252   handler for memory dumps
2253 */
2254 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2255                              TDB_DATA data, void *private_data)
2256 {
2257         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2258         TDB_DATA *dump;
2259         int ret;
2260         struct srvid_request *rd;
2261
2262         if (data.dsize != sizeof(struct srvid_request)) {
2263                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2264                 talloc_free(tmp_ctx);
2265                 return;
2266         }
2267         rd = (struct srvid_request *)data.dptr;
2268
2269         dump = talloc_zero(tmp_ctx, TDB_DATA);
2270         if (dump == NULL) {
2271                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2272                 talloc_free(tmp_ctx);
2273                 return;
2274         }
2275         ret = ctdb_dump_memory(ctdb, dump);
2276         if (ret != 0) {
2277                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2278                 talloc_free(tmp_ctx);
2279                 return;
2280         }
2281
2282 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2283
2284         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2285         if (ret != 0) {
2286                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2287                 talloc_free(tmp_ctx);
2288                 return;
2289         }
2290
2291         talloc_free(tmp_ctx);
2292 }
2293
2294 /*
2295   handler for getlog
2296 */
2297 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2298                            TDB_DATA data, void *private_data)
2299 {
2300         struct ctdb_get_log_addr *log_addr;
2301         pid_t child;
2302
2303         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2304                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2305                 return;
2306         }
2307         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2308
2309         child = ctdb_fork_no_free_ringbuffer(ctdb);
2310         if (child == (pid_t)-1) {
2311                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2312                 return;
2313         }
2314
2315         if (child == 0) {
2316                 ctdb_set_process_name("ctdb_rec_log_collector");
2317                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2318                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2319                         _exit(1);
2320                 }
2321                 ctdb_collect_log(ctdb, log_addr);
2322                 _exit(0);
2323         }
2324 }
2325
2326 /*
2327   handler for clearlog
2328 */
2329 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2330                              TDB_DATA data, void *private_data)
2331 {
2332         ctdb_clear_log(ctdb);
2333 }
2334
2335 /*
2336   handler for reload_nodes
2337 */
2338 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2339                              TDB_DATA data, void *private_data)
2340 {
2341         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2342
2343         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2344
2345         ctdb_load_nodes_file(rec->ctdb);
2346 }
2347
2348
2349 static void ctdb_rebalance_timeout(struct event_context *ev,
2350                                    struct timed_event *te,
2351                                    struct timeval t, void *p)
2352 {
2353         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2354
2355         if (rec->force_rebalance_nodes == NULL) {
2356                 DEBUG(DEBUG_ERR,
2357                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2358                 return;
2359         }
2360
2361         DEBUG(DEBUG_NOTICE,
2362               ("Rebalance timeout occurred - do takeover run\n"));
2363         do_takeover_run(rec, rec->nodemap, false);
2364 }
2365
2366         
2367 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2368                                         uint64_t srvid,
2369                                         TDB_DATA data, void *private_data)
2370 {
2371         uint32_t pnn;
2372         uint32_t *t;
2373         int len;
2374         uint32_t deferred_rebalance;
2375         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2376
2377         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2378                 return;
2379         }
2380
2381         if (data.dsize != sizeof(uint32_t)) {
2382                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2383                 return;
2384         }
2385
2386         pnn = *(uint32_t *)&data.dptr[0];
2387
2388         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2389
2390         /* Copy any existing list of nodes.  There's probably some
2391          * sort of realloc variant that will do this but we need to
2392          * make sure that freeing the old array also cancels the timer
2393          * event for the timeout... not sure if realloc will do that.
2394          */
2395         len = (rec->force_rebalance_nodes != NULL) ?
2396                 talloc_array_length(rec->force_rebalance_nodes) :
2397                 0;
2398
2399         /* This allows duplicates to be added but they don't cause
2400          * harm.  A call to add a duplicate PNN arguably means that
2401          * the timeout should be reset, so this is the simplest
2402          * solution.
2403          */
2404         t = talloc_zero_array(rec, uint32_t, len+1);
2405         CTDB_NO_MEMORY_VOID(ctdb, t);
2406         if (len > 0) {
2407                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2408         }
2409         t[len] = pnn;
2410
2411         talloc_free(rec->force_rebalance_nodes);
2412
2413         rec->force_rebalance_nodes = t;
2414
2415         /* If configured, setup a deferred takeover run to make sure
2416          * that certain nodes get IPs rebalanced to them.  This will
2417          * be cancelled if a successful takeover run happens before
2418          * the timeout.  Assign tunable value to variable for
2419          * readability.
2420          */
2421         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2422         if (deferred_rebalance != 0) {
2423                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2424                                 timeval_current_ofs(deferred_rebalance, 0),
2425                                 ctdb_rebalance_timeout, rec);
2426         }
2427 }
2428
2429
2430
2431 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2432                              TDB_DATA data, void *private_data)
2433 {
2434         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2435         struct ctdb_public_ip *ip;
2436
2437         if (rec->recmaster != rec->ctdb->pnn) {
2438                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2439                 return;
2440         }
2441
2442         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2443                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2444                 return;
2445         }
2446
2447         ip = (struct ctdb_public_ip *)data.dptr;
2448
2449         update_ip_assignment_tree(rec->ctdb, ip);
2450 }
2451
2452
2453 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2454 {
2455         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2456 }
2457
2458 static void reenable_takeover_runs(struct event_context *ev,
2459                                    struct timed_event *te,
2460                                    struct timeval yt, void *p)
2461 {
2462         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2463
2464         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2465         clear_takeover_runs_disable(rec);
2466 }
2467
2468 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2469                                           uint64_t srvid, TDB_DATA data,
2470                                           void *private_data)
2471 {
2472         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2473                                                     struct ctdb_recoverd);
2474         struct srvid_request_data *r;
2475         uint32_t timeout;
2476         TDB_DATA result;
2477         int32_t ret = 0;
2478
2479         /* Validate input data */
2480         if (data.dsize != sizeof(struct srvid_request_data)) {
2481                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2482                                  "expecting %lu\n", (long unsigned)data.dsize,
2483                                  (long unsigned)sizeof(struct srvid_request)));
2484                 return;
2485         }
2486         if (data.dptr == NULL) {
2487                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2488                 return;
2489         }
2490
2491         r = (struct srvid_request_data *)data.dptr;
2492         timeout = r->data;
2493
2494         if (timeout == 0) {
2495                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2496                 clear_takeover_runs_disable(rec);
2497                 ret = ctdb_get_pnn(ctdb);
2498                 goto done;
2499         }
2500
2501         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
2502                 DEBUG(DEBUG_ERR,
2503                       ("Refusing to disable takeover runs on inactive node\n"));
2504                 ret = -EHOSTDOWN;
2505                 goto done;
2506         }
2507
2508         if (rec->takeover_run_in_progress) {
2509                 DEBUG(DEBUG_ERR,
2510                       ("Unable to disable takeover runs - in progress\n"));
2511                 ret = -EAGAIN;
2512                 goto done;
2513         }
2514
2515         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2516
2517         /* Clear any old timers */
2518         clear_takeover_runs_disable(rec);
2519
2520         /* When this is non-NULL it indicates that takeover runs are
2521          * disabled.  This context also holds the timeout timer.
2522          */
2523         rec->takeover_runs_disable_ctx = talloc_new(rec);
2524         if (rec->takeover_runs_disable_ctx == NULL) {
2525                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2526                 ret = -ENOMEM;
2527                 goto done;
2528         }
2529
2530         /* Arrange for the timeout to occur */
2531         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2532                         timeval_current_ofs(timeout, 0),
2533                         reenable_takeover_runs,
2534                         rec);
2535
2536         /* Returning our PNN tells the caller that we succeeded */
2537         ret = ctdb_get_pnn(ctdb);
2538 done:
2539         result.dsize = sizeof(int32_t);
2540         result.dptr  = (uint8_t *)&ret;
2541         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2542 }
2543
2544 /* Backward compatibility for this SRVID - call
2545  * disable_takeover_runs_handler() instead
2546  */
2547 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2548                                      TDB_DATA data, void *private_data)
2549 {
2550         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2551                                                     struct ctdb_recoverd);
2552         TDB_DATA data2;
2553         struct srvid_request_data *req;
2554
2555         if (data.dsize != sizeof(uint32_t)) {
2556                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2557                                  "expecting %lu\n", (long unsigned)data.dsize,
2558                                  (long unsigned)sizeof(uint32_t)));
2559                 return;
2560         }
2561         if (data.dptr == NULL) {
2562                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2563                 return;
2564         }
2565
2566         req = talloc(ctdb, struct srvid_request_data);
2567         CTDB_NO_MEMORY_VOID(ctdb, req);
2568
2569         req->srvid = 0; /* No reply */
2570         req->pnn = -1;
2571         req->data = *((uint32_t *)data.dptr); /* Timeout */
2572
2573         data2.dsize = sizeof(*req);
2574         data2.dptr = (uint8_t *)req;
2575
2576         disable_takeover_runs_handler(rec->ctdb,
2577                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2578                                       data2, rec);
2579 }
2580
2581 /*
2582   handler for ip reallocate, just add it to the list of requests and 
2583   handle this later in the monitor_cluster loop so we do not recurse
2584   with other requests to takeover_run()
2585 */
2586 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2587                                   TDB_DATA data, void *private_data)
2588 {
2589         struct srvid_request *request;
2590         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2591                                                     struct ctdb_recoverd);
2592
2593         if (data.dsize != sizeof(struct srvid_request)) {
2594                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2595                 return;
2596         }
2597
2598         request = (struct srvid_request *)data.dptr;
2599
2600         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2601 }
2602
2603 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2604                                           struct ctdb_recoverd *rec)
2605 {
2606         TDB_DATA result;
2607         int32_t ret;
2608         uint32_t culprit;
2609
2610         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2611
2612         /* update the list of public ips that a node can handle for
2613            all connected nodes
2614         */
2615         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2616         if (ret != 0) {
2617                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2618                                  culprit));
2619                 rec->need_takeover_run = true;
2620         }
2621         if (ret == 0) {
2622                 if (do_takeover_run(rec, rec->nodemap, false)) {
2623                         ret = ctdb_get_pnn(ctdb);
2624                 } else {
2625                         ret = -1;
2626                 }
2627         }
2628
2629         result.dsize = sizeof(int32_t);
2630         result.dptr  = (uint8_t *)&ret;
2631
2632         srvid_requests_reply(ctdb, &rec->reallocate_requests, result);
2633 }
2634
2635
2636 /*
2637   handler for recovery master elections
2638 */
2639 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2640                              TDB_DATA data, void *private_data)
2641 {
2642         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2643         int ret;
2644         struct election_message *em = (struct election_message *)data.dptr;
2645         TALLOC_CTX *mem_ctx;
2646
2647         /* Ignore election packets from ourself */
2648         if (ctdb->pnn == em->pnn) {
2649                 return;
2650         }
2651
2652         /* we got an election packet - update the timeout for the election */
2653         talloc_free(rec->election_timeout);
2654         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2655                                                 fast_start ?
2656                                                 timeval_current_ofs(0, 500000) :
2657                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2658                                                 ctdb_election_timeout, rec);
2659
2660         mem_ctx = talloc_new(ctdb);
2661
2662         /* someone called an election. check their election data
2663            and if we disagree and we would rather be the elected node, 
2664            send a new election message to all other nodes
2665          */
2666         if (ctdb_election_win(rec, em)) {
2667                 if (!rec->send_election_te) {
2668                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2669                                                                 timeval_current_ofs(0, 500000),
2670                                                                 election_send_request, rec);
2671                 }
2672                 talloc_free(mem_ctx);
2673                 /*unban_all_nodes(ctdb);*/
2674                 return;
2675         }
2676         
2677         /* we didn't win */
2678         talloc_free(rec->send_election_te);
2679         rec->send_election_te = NULL;
2680
2681         if (ctdb->tunable.verify_recovery_lock != 0) {
2682                 /* release the recmaster lock */
2683                 if (em->pnn != ctdb->pnn &&
2684                     ctdb->recovery_lock_fd != -1) {
2685                         close(ctdb->recovery_lock_fd);
2686                         ctdb->recovery_lock_fd = -1;
2687                         unban_all_nodes(ctdb);
2688                 }
2689         }
2690
2691         /* ok, let that guy become recmaster then */
2692         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2693         if (ret != 0) {
2694                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2695                 talloc_free(mem_ctx);
2696                 return;
2697         }
2698
2699         talloc_free(mem_ctx);
2700         return;
2701 }
2702
2703
2704 /*
2705   force the start of the election process
2706  */
2707 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2708                            struct ctdb_node_map *nodemap)
2709 {
2710         int ret;
2711         struct ctdb_context *ctdb = rec->ctdb;
2712
2713         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2714
2715         /* set all nodes to recovery mode to stop all internode traffic */
2716         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2717         if (ret != 0) {
2718                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2719                 return;
2720         }
2721
2722         talloc_free(rec->election_timeout);
2723         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2724                                                 fast_start ?
2725                                                 timeval_current_ofs(0, 500000) :
2726                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2727                                                 ctdb_election_timeout, rec);
2728
2729         ret = send_election_request(rec, pnn);
2730         if (ret!=0) {
2731                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2732                 return;
2733         }
2734
2735         /* wait for a few seconds to collect all responses */
2736         ctdb_wait_election(rec);
2737 }
2738
2739
2740
2741 /*
2742   handler for when a node changes its flags
2743 */
2744 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2745                             TDB_DATA data, void *private_data)
2746 {
2747         int ret;
2748         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2749         struct ctdb_node_map *nodemap=NULL;
2750         TALLOC_CTX *tmp_ctx;
2751         int i;
2752         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2753         int disabled_flag_changed;
2754
2755         if (data.dsize != sizeof(*c)) {
2756                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2757                 return;
2758         }
2759
2760         tmp_ctx = talloc_new(ctdb);
2761         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2762
2763         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2764         if (ret != 0) {
2765                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2766                 talloc_free(tmp_ctx);
2767                 return;         
2768         }
2769
2770
2771         for (i=0;i<nodemap->num;i++) {
2772                 if (nodemap->nodes[i].pnn == c->pnn) break;
2773         }
2774
2775         if (i == nodemap->num) {
2776                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2777                 talloc_free(tmp_ctx);
2778                 return;
2779         }
2780
2781         if (c->old_flags != c->new_flags) {
2782                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2783         }
2784
2785         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2786
2787         nodemap->nodes[i].flags = c->new_flags;
2788
2789         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2790                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2791
2792         if (ret == 0) {
2793                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2794                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2795         }
2796         
2797         if (ret == 0 &&
2798             ctdb->recovery_master == ctdb->pnn &&
2799             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2800                 /* Only do the takeover run if the perm disabled or unhealthy
2801                    flags changed since these will cause an ip failover but not
2802                    a recovery.
2803                    If the node became disconnected or banned this will also
2804                    lead to an ip address failover but that is handled 
2805                    during recovery
2806                 */
2807                 if (disabled_flag_changed) {
2808                         rec->need_takeover_run = true;
2809                 }
2810         }
2811
2812         talloc_free(tmp_ctx);
2813 }
2814
2815 /*
2816   handler for when we need to push out flag changes ot all other nodes
2817 */
2818 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2819                             TDB_DATA data, void *private_data)
2820 {
2821         int ret;
2822         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2823         struct ctdb_node_map *nodemap=NULL;
2824         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2825         uint32_t recmaster;
2826         uint32_t *nodes;
2827
2828         /* find the recovery master */
2829         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2830         if (ret != 0) {
2831                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2832                 talloc_free(tmp_ctx);
2833                 return;
2834         }
2835
2836         /* read the node flags from the recmaster */
2837         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2838         if (ret != 0) {
2839                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2840                 talloc_free(tmp_ctx);
2841                 return;
2842         }
2843         if (c->pnn >= nodemap->num) {
2844                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2845                 talloc_free(tmp_ctx);
2846                 return;
2847         }
2848
2849         /* send the flags update to all connected nodes */
2850         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2851
2852         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2853                                       nodes, 0, CONTROL_TIMEOUT(),
2854                                       false, data,
2855                                       NULL, NULL,
2856                                       NULL) != 0) {
2857                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2858
2859                 talloc_free(tmp_ctx);
2860                 return;
2861         }
2862
2863         talloc_free(tmp_ctx);
2864 }
2865
2866
2867 struct verify_recmode_normal_data {
2868         uint32_t count;
2869         enum monitor_result status;
2870 };
2871
2872 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2873 {
2874         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2875
2876
2877         /* one more node has responded with recmode data*/
2878         rmdata->count--;
2879
2880         /* if we failed to get the recmode, then return an error and let
2881            the main loop try again.
2882         */
2883         if (state->state != CTDB_CONTROL_DONE) {
2884                 if (rmdata->status == MONITOR_OK) {
2885                         rmdata->status = MONITOR_FAILED;
2886                 }
2887                 return;
2888         }
2889
2890         /* if we got a response, then the recmode will be stored in the
2891            status field
2892         */
2893         if (state->status != CTDB_RECOVERY_NORMAL) {
2894                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2895                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2896         }
2897
2898         return;
2899 }
2900
2901
2902 /* verify that all nodes are in normal recovery mode */
2903 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2904 {
2905         struct verify_recmode_normal_data *rmdata;
2906         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2907         struct ctdb_client_control_state *state;
2908         enum monitor_result status;
2909         int j;
2910         
2911         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2912         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2913         rmdata->count  = 0;
2914         rmdata->status = MONITOR_OK;
2915
2916         /* loop over all active nodes and send an async getrecmode call to 
2917            them*/
2918         for (j=0; j<nodemap->num; j++) {
2919                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2920                         continue;
2921                 }
2922                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2923                                         CONTROL_TIMEOUT(), 
2924                                         nodemap->nodes[j].pnn);
2925                 if (state == NULL) {
2926                         /* we failed to send the control, treat this as 
2927                            an error and try again next iteration
2928                         */                      
2929                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2930                         talloc_free(mem_ctx);
2931                         return MONITOR_FAILED;
2932                 }
2933
2934                 /* set up the callback functions */
2935                 state->async.fn = verify_recmode_normal_callback;
2936                 state->async.private_data = rmdata;
2937
2938                 /* one more control to wait for to complete */
2939                 rmdata->count++;
2940         }
2941
2942
2943         /* now wait for up to the maximum number of seconds allowed
2944            or until all nodes we expect a response from has replied
2945         */
2946         while (rmdata->count > 0) {
2947                 event_loop_once(ctdb->ev);
2948         }
2949
2950         status = rmdata->status;
2951         talloc_free(mem_ctx);
2952         return status;
2953 }
2954
2955
2956 struct verify_recmaster_data {
2957         struct ctdb_recoverd *rec;
2958         uint32_t count;
2959         uint32_t pnn;
2960         enum monitor_result status;
2961 };
2962
2963 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
2964 {
2965         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
2966
2967
2968         /* one more node has responded with recmaster data*/
2969         rmdata->count--;
2970
2971         /* if we failed to get the recmaster, then return an error and let
2972            the main loop try again.
2973         */
2974         if (state->state != CTDB_CONTROL_DONE) {
2975                 if (rmdata->status == MONITOR_OK) {
2976                         rmdata->status = MONITOR_FAILED;
2977                 }
2978                 return;
2979         }
2980
2981         /* if we got a response, then the recmaster will be stored in the
2982            status field
2983         */
2984         if (state->status != rmdata->pnn) {
2985                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
2986                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
2987                 rmdata->status = MONITOR_ELECTION_NEEDED;
2988         }
2989
2990         return;
2991 }
2992
2993
2994 /* verify that all nodes agree that we are the recmaster */
2995 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
2996 {
2997         struct ctdb_context *ctdb = rec->ctdb;
2998         struct verify_recmaster_data *rmdata;
2999         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3000         struct ctdb_client_control_state *state;
3001         enum monitor_result status;
3002         int j;
3003         
3004         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3005         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3006         rmdata->rec    = rec;
3007         rmdata->count  = 0;
3008         rmdata->pnn    = pnn;
3009         rmdata->status = MONITOR_OK;
3010
3011         /* loop over all active nodes and send an async getrecmaster call to 
3012            them*/
3013         for (j=0; j<nodemap->num; j++) {
3014                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3015                         continue;
3016                 }
3017                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3018                                         CONTROL_TIMEOUT(),
3019                                         nodemap->nodes[j].pnn);
3020                 if (state == NULL) {
3021                         /* we failed to send the control, treat this as 
3022                            an error and try again next iteration
3023                         */                      
3024                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3025                         talloc_free(mem_ctx);
3026                         return MONITOR_FAILED;
3027                 }
3028
3029                 /* set up the callback functions */
3030                 state->async.fn = verify_recmaster_callback;
3031                 state->async.private_data = rmdata;
3032
3033                 /* one more control to wait for to complete */
3034                 rmdata->count++;
3035         }
3036
3037
3038         /* now wait for up to the maximum number of seconds allowed
3039            or until all nodes we expect a response from has replied
3040         */
3041         while (rmdata->count > 0) {
3042                 event_loop_once(ctdb->ev);
3043         }
3044
3045         status = rmdata->status;
3046         talloc_free(mem_ctx);
3047         return status;
3048 }
3049
3050 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3051                                     struct ctdb_recoverd *rec)
3052 {
3053         struct ctdb_control_get_ifaces *ifaces = NULL;
3054         TALLOC_CTX *mem_ctx;
3055         bool ret = false;
3056
3057         mem_ctx = talloc_new(NULL);
3058
3059         /* Read the interfaces from the local node */
3060         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3061                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3062                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3063                 /* We could return an error.  However, this will be
3064                  * rare so we'll decide that the interfaces have
3065                  * actually changed, just in case.
3066                  */
3067                 talloc_free(mem_ctx);
3068                 return true;
3069         }
3070
3071         if (!rec->ifaces) {
3072                 /* We haven't been here before so things have changed */
3073                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3074                 ret = true;
3075         } else if (rec->ifaces->num != ifaces->num) {
3076                 /* Number of interfaces has changed */
3077                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3078                                      rec->ifaces->num, ifaces->num));
3079                 ret = true;
3080         } else {
3081                 /* See if interface names or link states have changed */
3082                 int i;
3083                 for (i = 0; i < rec->ifaces->num; i++) {
3084                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3085                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3086                                 DEBUG(DEBUG_NOTICE,
3087                                       ("Interface in slot %d changed: %s => %s\n",
3088                                        i, iface->name, ifaces->ifaces[i].name));
3089                                 ret = true;
3090                                 break;
3091                         }
3092                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3093                                 DEBUG(DEBUG_NOTICE,
3094                                       ("Interface %s changed state: %d => %d\n",
3095                                        iface->name, iface->link_state,
3096                                        ifaces->ifaces[i].link_state));
3097                                 ret = true;
3098                                 break;
3099                         }
3100                 }
3101         }
3102
3103         talloc_free(rec->ifaces);
3104         rec->ifaces = talloc_steal(rec, ifaces);
3105
3106         talloc_free(mem_ctx);
3107         return ret;
3108 }
3109
3110 /* called to check that the local allocation of public ip addresses is ok.
3111 */
3112 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3113 {
3114         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3115         struct ctdb_uptime *uptime1 = NULL;
3116         struct ctdb_uptime *uptime2 = NULL;
3117         int ret, j;
3118         bool need_takeover_run = false;
3119
3120         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3121                                 CTDB_CURRENT_NODE, &uptime1);
3122         if (ret != 0) {
3123                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3124                 talloc_free(mem_ctx);
3125                 return -1;
3126         }
3127
3128         if (interfaces_have_changed(ctdb, rec)) {
3129                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3130                                      "local node %u - force takeover run\n",
3131                                      pnn));
3132                 need_takeover_run = true;
3133         }
3134
3135         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3136                                 CTDB_CURRENT_NODE, &uptime2);
3137         if (ret != 0) {
3138                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3139                 talloc_free(mem_ctx);
3140                 return -1;
3141         }
3142
3143         /* skip the check if the startrecovery time has changed */
3144         if (timeval_compare(&uptime1->last_recovery_started,
3145                             &uptime2->last_recovery_started) != 0) {
3146                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3147                 talloc_free(mem_ctx);
3148                 return 0;
3149         }
3150
3151         /* skip the check if the endrecovery time has changed */
3152         if (timeval_compare(&uptime1->last_recovery_finished,
3153                             &uptime2->last_recovery_finished) != 0) {
3154                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3155                 talloc_free(mem_ctx);
3156                 return 0;
3157         }
3158
3159         /* skip the check if we have started but not finished recovery */
3160         if (timeval_compare(&uptime1->last_recovery_finished,
3161                             &uptime1->last_recovery_started) != 1) {
3162                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3163                 talloc_free(mem_ctx);
3164
3165                 return 0;
3166         }
3167
3168         /* verify that we have the ip addresses we should have
3169            and we dont have ones we shouldnt have.
3170            if we find an inconsistency we set recmode to
3171            active on the local node and wait for the recmaster
3172            to do a full blown recovery.
3173            also if the pnn is -1 and we are healthy and can host the ip
3174            we also request a ip reallocation.
3175         */
3176         if (ctdb->tunable.disable_ip_failover == 0) {
3177                 struct ctdb_all_public_ips *ips = NULL;
3178
3179                 /* read the *available* IPs from the local node */
3180                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3181                 if (ret != 0) {
3182                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3183                         talloc_free(mem_ctx);
3184                         return -1;
3185                 }
3186
3187                 for (j=0; j<ips->num; j++) {
3188                         if (ips->ips[j].pnn == -1 &&
3189                             nodemap->nodes[pnn].flags == 0) {
3190                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3191                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3192                                 need_takeover_run = true;
3193                         }
3194                 }
3195
3196                 talloc_free(ips);
3197
3198                 /* read the *known* IPs from the local node */
3199                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3200                 if (ret != 0) {
3201                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3202                         talloc_free(mem_ctx);
3203                         return -1;
3204                 }
3205
3206                 for (j=0; j<ips->num; j++) {
3207                         if (ips->ips[j].pnn == pnn) {
3208                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3209                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3210                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3211                                         need_takeover_run = true;
3212                                 }
3213                         } else {
3214                                 if (ctdb->do_checkpublicip &&
3215                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3216
3217                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3218                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3219
3220                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3221                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3222                                         }
3223                                 }
3224                         }
3225                 }
3226         }
3227
3228         if (need_takeover_run) {
3229                 struct srvid_request rd;
3230                 TDB_DATA data;
3231
3232                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3233
3234                 rd.pnn = ctdb->pnn;
3235                 rd.srvid = 0;
3236                 data.dptr = (uint8_t *)&rd;
3237                 data.dsize = sizeof(rd);
3238
3239                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3240                 if (ret != 0) {
3241                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3242                 }
3243         }
3244         talloc_free(mem_ctx);
3245         return 0;
3246 }
3247
3248
3249 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3250 {
3251         struct ctdb_node_map **remote_nodemaps = callback_data;
3252
3253         if (node_pnn >= ctdb->num_nodes) {
3254                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3255                 return;
3256         }
3257
3258         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3259
3260 }
3261
3262 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3263         struct ctdb_node_map *nodemap,
3264         struct ctdb_node_map **remote_nodemaps)
3265 {
3266         uint32_t *nodes;
3267
3268         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3269         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3270                                         nodes, 0,
3271                                         CONTROL_TIMEOUT(), false, tdb_null,
3272                                         async_getnodemap_callback,
3273                                         NULL,
3274                                         remote_nodemaps) != 0) {
3275                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3276
3277                 return -1;
3278         }
3279
3280         return 0;
3281 }
3282
3283 enum reclock_child_status { RECLOCK_CHECKING, RECLOCK_OK, RECLOCK_FAILED, RECLOCK_TIMEOUT};
3284 struct ctdb_check_reclock_state {
3285         struct ctdb_context *ctdb;
3286         struct timeval start_time;
3287         int fd[2];
3288         pid_t child;
3289         struct timed_event *te;
3290         struct fd_event *fde;
3291         enum reclock_child_status status;
3292 };
3293
3294 /* when we free the reclock state we must kill any child process.
3295 */
3296 static int check_reclock_destructor(struct ctdb_check_reclock_state *state)
3297 {
3298         struct ctdb_context *ctdb = state->ctdb;
3299
3300         ctdb_ctrl_report_recd_lock_latency(ctdb, CONTROL_TIMEOUT(), timeval_elapsed(&state->start_time));
3301
3302         if (state->fd[0] != -1) {
3303                 close(state->fd[0]);
3304                 state->fd[0] = -1;
3305         }
3306         if (state->fd[1] != -1) {
3307                 close(state->fd[1]);
3308                 state->fd[1] = -1;
3309         }
3310         ctdb_kill(ctdb, state->child, SIGKILL);
3311         return 0;
3312 }
3313
3314 /*
3315   called if our check_reclock child times out. this would happen if
3316   i/o to the reclock file blocks.
3317  */
3318 static void ctdb_check_reclock_timeout(struct event_context *ev, struct timed_event *te, 
3319                                          struct timeval t, void *private_data)
3320 {
3321         struct ctdb_check_reclock_state *state = talloc_get_type(private_data, 
3322                                            struct ctdb_check_reclock_state);
3323
3324         DEBUG(DEBUG_ERR,(__location__ " check_reclock child process hung/timedout CFS slow to grant locks?\n"));
3325         state->status = RECLOCK_TIMEOUT;
3326 }
3327
3328 /* this is called when the child process has completed checking the reclock
3329    file and has written data back to us through the pipe.
3330 */
3331 static void reclock_child_handler(struct event_context *ev, struct fd_event *fde, 
3332                              uint16_t flags, void *private_data)
3333 {
3334         struct ctdb_check_reclock_state *state= talloc_get_type(private_data, 
3335                                              struct ctdb_check_reclock_state);
3336         char c = 0;
3337         int ret;
3338
3339         /* we got a response from our child process so we can abort the
3340            timeout.
3341         */
3342         talloc_free(state->te);
3343         state->te = NULL;
3344
3345         ret = read(state->fd[0], &c, 1);
3346         if (ret != 1 || c != RECLOCK_OK) {
3347                 DEBUG(DEBUG_ERR,(__location__ " reclock child process returned error %d\n", c));
3348                 state->status = RECLOCK_FAILED;
3349
3350                 return;
3351         }
3352
3353         state->status = RECLOCK_OK;
3354         return;
3355 }
3356
3357 static int check_recovery_lock(struct ctdb_context *ctdb)
3358 {
3359         int ret;
3360         struct ctdb_check_reclock_state *state;
3361         pid_t parent = getpid();
3362
3363         if (ctdb->recovery_lock_fd == -1) {
3364                 DEBUG(DEBUG_CRIT,("recovery master doesn't have the recovery lock\n"));
3365                 return -1;
3366         }
3367
3368         state = talloc(ctdb, struct ctdb_check_reclock_state);
3369         CTDB_NO_MEMORY(ctdb, state);
3370
3371         state->ctdb = ctdb;
3372         state->start_time = timeval_current();
3373         state->status = RECLOCK_CHECKING;
3374         state->fd[0] = -1;
3375         state->fd[1] = -1;
3376
3377         ret = pipe(state->fd);
3378         if (ret != 0) {
3379                 talloc_free(state);
3380                 DEBUG(DEBUG_CRIT,(__location__ " Failed to open pipe for check_reclock child\n"));
3381                 return -1;
3382         }
3383
3384         state->child = ctdb_fork(ctdb);
3385         if (state->child == (pid_t)-1) {
3386                 DEBUG(DEBUG_CRIT,(__location__ " fork() failed in check_reclock child\n"));
3387                 close(state->fd[0]);
3388                 state->fd[0] = -1;
3389                 close(state->fd[1]);
3390                 state->fd[1] = -1;
3391                 talloc_free(state);
3392                 return -1;
3393         }
3394
3395         if (state->child == 0) {
3396                 char cc = RECLOCK_OK;
3397                 close(state->fd[0]);
3398                 state->fd[0] = -1;
3399
3400                 ctdb_set_process_name("ctdb_rec_reclock");
3401                 debug_extra = talloc_asprintf(NULL, "recovery-lock:");
3402                 if (pread(ctdb->recovery_lock_fd, &cc, 1, 0) == -1) {
3403                         DEBUG(DEBUG_CRIT,("failed read from recovery_lock_fd - %s\n", strerror(errno)));
3404                         cc = RECLOCK_FAILED;
3405                 }
3406
3407                 write(state->fd[1], &cc, 1);
3408                 /* make sure we die when our parent dies */
3409                 while (ctdb_kill(ctdb, parent, 0) == 0 || errno != ESRCH) {
3410                         sleep(5);
3411                 }
3412                 _exit(0);
3413         }
3414         close(state->fd[1]);
3415         state->fd[1] = -1;
3416         set_close_on_exec(state->fd[0]);
3417
3418         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d for check_recovery_lock\n", state->fd[0]));
3419
3420         talloc_set_destructor(state, check_reclock_destructor);
3421
3422         state->te = event_add_timed(ctdb->ev, state, timeval_current_ofs(15, 0),
3423                                     ctdb_check_reclock_timeout, state);
3424         if (state->te == NULL) {
3425                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create a timed event for reclock child\n"));
3426                 talloc_free(state);
3427                 return -1;
3428         }
3429
3430         state->fde = event_add_fd(ctdb->ev, state, state->fd[0],
3431                                 EVENT_FD_READ,
3432                                 reclock_child_handler,
3433                                 (void *)state);
3434
3435         if (state->fde == NULL) {
3436                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create an fd event for reclock child\n"));
3437                 talloc_free(state);
3438                 return -1;
3439         }
3440         tevent_fd_set_auto_close(state->fde);
3441
3442         while (state->status == RECLOCK_CHECKING) {
3443                 event_loop_once(ctdb->ev);
3444         }
3445
3446         if (state->status == RECLOCK_FAILED) {
3447                 DEBUG(DEBUG_ERR,(__location__ " reclock child failed when checking file\n"));
3448                 close(ctdb->recovery_lock_fd);
3449                 ctdb->recovery_lock_fd = -1;
3450                 talloc_free(state);
3451                 return -1;
3452         }
3453
3454         talloc_free(state);
3455         return 0;
3456 }
3457
3458 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3459 {
3460         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3461         const char *reclockfile;
3462
3463         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3464                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3465                 talloc_free(tmp_ctx);
3466                 return -1;      
3467         }
3468
3469         if (reclockfile == NULL) {
3470                 if (ctdb->recovery_lock_file != NULL) {
3471                         DEBUG(DEBUG_ERR,("Reclock file disabled\n"));
3472                         talloc_free(ctdb->recovery_lock_file);
3473                         ctdb->recovery_lock_file = NULL;
3474                         if (ctdb->recovery_lock_fd != -1) {
3475                                 close(ctdb->recovery_lock_fd);
3476                                 ctdb->recovery_lock_fd = -1;
3477                         }
3478                 }
3479                 ctdb->tunable.verify_recovery_lock = 0;
3480                 talloc_free(tmp_ctx);
3481                 return 0;
3482         }
3483
3484         if (ctdb->recovery_lock_file == NULL) {
3485                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3486                 if (ctdb->recovery_lock_fd != -1) {
3487                         close(ctdb->recovery_lock_fd);
3488                         ctdb->recovery_lock_fd = -1;
3489                 }
3490                 talloc_free(tmp_ctx);
3491                 return 0;
3492         }
3493
3494
3495         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3496                 talloc_free(tmp_ctx);
3497                 return 0;
3498         }
3499
3500         talloc_free(ctdb->recovery_lock_file);
3501         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3502         ctdb->tunable.verify_recovery_lock = 0;
3503         if (ctdb->recovery_lock_fd != -1) {
3504                 close(ctdb->recovery_lock_fd);
3505                 ctdb->recovery_lock_fd = -1;
3506         }
3507
3508         talloc_free(tmp_ctx);
3509         return 0;
3510 }
3511
3512 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3513                       TALLOC_CTX *mem_ctx)
3514 {
3515         uint32_t pnn;
3516         struct ctdb_node_map *nodemap=NULL;
3517         struct ctdb_node_map *recmaster_nodemap=NULL;
3518         struct ctdb_node_map **remote_nodemaps=NULL;
3519         struct ctdb_vnn_map *vnnmap=NULL;
3520         struct ctdb_vnn_map *remote_vnnmap=NULL;
3521         int32_t debug_level;
3522         int i, j, ret;
3523         bool self_ban;
3524
3525
3526         /* verify that the main daemon is still running */
3527         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3528                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3529                 exit(-1);
3530         }
3531
3532         /* ping the local daemon to tell it we are alive */
3533         ctdb_ctrl_recd_ping(ctdb);
3534
3535         if (rec->election_timeout) {
3536                 /* an election is in progress */
3537                 return;
3538         }
3539
3540         /* read the debug level from the parent and update locally */
3541         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3542         if (ret !=0) {
3543                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3544                 return;
3545         }
3546         LogLevel = debug_level;
3547
3548         /* get relevant tunables */
3549         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3550         if (ret != 0) {
3551                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3552                 return;
3553         }
3554
3555         /* get the current recovery lock file from the server */
3556         if (update_recovery_lock_file(ctdb) != 0) {
3557                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3558                 return;
3559         }
3560
3561         /* Make sure that if recovery lock verification becomes disabled when
3562            we close the file
3563         */
3564         if (ctdb->tunable.verify_recovery_lock == 0) {
3565                 if (ctdb->recovery_lock_fd != -1) {
3566                         close(ctdb->recovery_lock_fd);
3567                         ctdb->recovery_lock_fd = -1;
3568                 }
3569         }
3570
3571         pnn = ctdb_get_pnn(ctdb);
3572
3573         /* get the vnnmap */
3574         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3575         if (ret != 0) {
3576                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3577                 return;
3578         }
3579
3580
3581         /* get number of nodes */
3582         if (rec->nodemap) {
3583                 talloc_free(rec->nodemap);
3584                 rec->nodemap = NULL;
3585                 nodemap=NULL;
3586         }
3587         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3588         if (ret != 0) {
3589                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3590                 return;
3591         }
3592         nodemap = rec->nodemap;
3593
3594         /* remember our own node flags */
3595         rec->node_flags = nodemap->nodes[pnn].flags;
3596
3597         ban_misbehaving_nodes(rec, &self_ban);
3598         if (self_ban) {
3599                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3600                 return;
3601         }
3602
3603         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3604            also frozen and that the recmode is set to active.
3605         */
3606         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3607                 /* If this node has become inactive then we want to
3608                  * reduce the chances of it taking over the recovery
3609                  * master role when it becomes active again.  This
3610                  * helps to stabilise the recovery master role so that
3611                  * it stays on the most stable node.
3612                  */
3613                 rec->priority_time = timeval_current();
3614
3615                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3616                 if (ret != 0) {
3617                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3618                 }
3619                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3620                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3621
3622                         ret = ctdb_ctrl_freeze_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, 1);
3623                         if (ret != 0) {
3624                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3625                                 return;
3626                         }
3627                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3628                         if (ret != 0) {
3629                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3630
3631                                 return;
3632                         }
3633                 }
3634
3635                 /* If this node is stopped or banned then it is not the recovery
3636                  * master, so don't do anything. This prevents stopped or banned
3637                  * node from starting election and sending unnecessary controls.
3638                  */
3639                 return;
3640         }
3641
3642         /* check which node is the recovery master */
3643         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3644         if (ret != 0) {
3645                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3646                 return;
3647         }
3648
3649         /* If we are not the recmaster then do some housekeeping */
3650         if (rec->recmaster != pnn) {
3651                 /* Ignore any IP reallocate requests - only recmaster
3652                  * processes them
3653                  */
3654                 TALLOC_FREE(rec->reallocate_requests);
3655                 /* Clear any nodes that should be force rebalanced in
3656                  * the next takeover run.  If the recovery master role
3657                  * has moved then we don't want to process these some
3658                  * time in the future.
3659                  */
3660                 TALLOC_FREE(rec->force_rebalance_nodes);
3661         }
3662
3663         /* This is a special case.  When recovery daemon is started, recmaster
3664          * is set to -1.  If a node is not started in stopped state, then
3665          * start election to decide recovery master
3666          */
3667         if (rec->recmaster == (uint32_t)-1) {
3668                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3669                 force_election(rec, pnn, nodemap);
3670                 return;
3671         }
3672
3673         /* update the capabilities for all nodes */
3674         ret = update_capabilities(ctdb, nodemap);
3675         if (ret != 0) {
3676                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3677                 return;
3678         }
3679
3680         /*
3681          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3682          * but we have, then force an election and try to become the new
3683          * recmaster.
3684          */
3685         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3686             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3687              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3688                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3689                                   " but we (node %u) have - force an election\n",
3690                                   rec->recmaster, pnn));
3691                 force_election(rec, pnn, nodemap);
3692                 return;
3693         }
3694
3695         /* count how many active nodes there are */
3696         rec->num_active    = 0;
3697         rec->num_lmasters  = 0;
3698         rec->num_connected = 0;
3699         for (i=0; i<nodemap->num; i++) {
3700                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3701                         rec->num_active++;
3702                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3703                                 rec->num_lmasters++;
3704                         }
3705                 }
3706                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3707                         rec->num_connected++;
3708                 }
3709         }
3710
3711
3712         /* verify that the recmaster node is still active */
3713         for (j=0; j<nodemap->num; j++) {
3714                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3715                         break;
3716                 }
3717         }
3718
3719         if (j == nodemap->num) {
3720                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3721                 force_election(rec, pnn, nodemap);
3722                 return;
3723         }
3724
3725         /* if recovery master is disconnected we must elect a new recmaster */
3726         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3727                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3728                 force_election(rec, pnn, nodemap);
3729                 return;
3730         }
3731
3732         /* get nodemap from the recovery master to check if it is inactive */
3733         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3734                                    mem_ctx, &recmaster_nodemap);
3735         if (ret != 0) {
3736                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3737                           nodemap->nodes[j].pnn));
3738                 return;
3739         }
3740
3741
3742         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3743             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3744                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3745                 /*
3746                  * update our nodemap to carry the recmaster's notion of
3747                  * its own flags, so that we don't keep freezing the
3748                  * inactive recmaster node...
3749                  */
3750                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3751                 force_election(rec, pnn, nodemap);
3752                 return;
3753         }
3754
3755         /* verify that we have all ip addresses we should have and we dont
3756          * have addresses we shouldnt have.
3757          */ 
3758         if (ctdb->tunable.disable_ip_failover == 0 &&
3759             rec->takeover_runs_disable_ctx == NULL) {
3760                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3761                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3762                 }
3763         }
3764
3765
3766         /* if we are not the recmaster then we do not need to check
3767            if recovery is needed
3768          */
3769         if (pnn != rec->recmaster) {
3770                 return;
3771         }
3772
3773
3774         /* ensure our local copies of flags are right */
3775         ret = update_local_flags(rec, nodemap);
3776         if (ret == MONITOR_ELECTION_NEEDED) {
3777                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3778                 force_election(rec, pnn, nodemap);
3779                 return;
3780         }
3781         if (ret != MONITOR_OK) {
3782                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3783                 return;
3784         }
3785
3786         if (ctdb->num_nodes != nodemap->num) {
3787                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3788                 ctdb_load_nodes_file(ctdb);
3789                 return;
3790         }
3791
3792         /* verify that all active nodes agree that we are the recmaster */
3793         switch (verify_recmaster(rec, nodemap, pnn)) {
3794         case MONITOR_RECOVERY_NEEDED:
3795                 /* can not happen */
3796                 return;
3797         case MONITOR_ELECTION_NEEDED:
3798                 force_election(rec, pnn, nodemap);
3799                 return;
3800         case MONITOR_OK:
3801                 break;
3802         case MONITOR_FAILED:
3803                 return;
3804         }
3805
3806
3807         if (rec->need_recovery) {
3808                 /* a previous recovery didn't finish */
3809                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3810                 return;
3811         }
3812
3813         /* verify that all active nodes are in normal mode 
3814            and not in recovery mode 
3815         */
3816         switch (verify_recmode(ctdb, nodemap)) {
3817         case MONITOR_RECOVERY_NEEDED:
3818                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3819                 return;
3820         case MONITOR_FAILED:
3821                 return;
3822         case MONITOR_ELECTION_NEEDED:
3823                 /* can not happen */
3824         case MONITOR_OK:
3825                 break;
3826         }
3827
3828
3829         if (ctdb->tunable.verify_recovery_lock != 0) {
3830                 /* we should have the reclock - check its not stale */
3831                 ret = check_recovery_lock(ctdb);
3832                 if (ret != 0) {
3833                         DEBUG(DEBUG_ERR,("Failed check_recovery_lock. Force a recovery\n"));
3834                         ctdb_set_culprit(rec, ctdb->pnn);
3835                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3836                         return;
3837                 }
3838         }
3839
3840
3841         /* if there are takeovers requested, perform it and notify the waiters */
3842         if (rec->takeover_runs_disable_ctx == NULL &&
3843             rec->reallocate_requests) {
3844                 process_ipreallocate_requests(ctdb, rec);
3845         }
3846
3847         /* get the nodemap for all active remote nodes
3848          */
3849         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3850         if (remote_nodemaps == NULL) {
3851                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3852                 return;
3853         }
3854         for(i=0; i<nodemap->num; i++) {
3855                 remote_nodemaps[i] = NULL;
3856         }
3857         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3858                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3859                 return;
3860         } 
3861
3862         /* verify that all other nodes have the same nodemap as we have
3863         */
3864         for (j=0; j<nodemap->num; j++) {
3865                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3866                         continue;
3867                 }
3868
3869                 if (remote_nodemaps[j] == NULL) {
3870                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3871                         ctdb_set_culprit(rec, j);
3872
3873                         return;
3874                 }
3875
3876                 /* if the nodes disagree on how many nodes there are
3877                    then this is a good reason to try recovery
3878                  */
3879                 if (remote_nodemaps[j]->num != nodemap->num) {
3880                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3881                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3882                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3883                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3884                         return;
3885                 }
3886
3887                 /* if the nodes disagree on which nodes exist and are
3888                    active, then that is also a good reason to do recovery
3889                  */
3890                 for (i=0;i<nodemap->num;i++) {
3891                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3892                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3893                                           nodemap->nodes[j].pnn, i, 
3894                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3895                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3896                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3897                                             vnnmap);
3898                                 return;
3899                         }
3900                 }
3901         }
3902
3903         /*
3904          * Update node flags obtained from each active node. This ensure we have
3905          * up-to-date information for all the nodes.
3906          */
3907         for (j=0; j<nodemap->num; j++) {
3908                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3909                         continue;
3910                 }
3911                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3912         }
3913
3914         for (j=0; j<nodemap->num; j++) {
3915                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3916                         continue;
3917                 }
3918
3919                 /* verify the flags are consistent
3920                 */
3921                 for (i=0; i<nodemap->num; i++) {
3922                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3923                                 continue;
3924                         }
3925                         
3926                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3927                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3928                                   nodemap->nodes[j].pnn, 
3929                                   nodemap->nodes[i].pnn, 
3930                                   remote_nodemaps[j]->nodes[i].flags,
3931                                   nodemap->nodes[i].flags));
3932                                 if (i == j) {
3933                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3934                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3935                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3936                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3937                                                     vnnmap);
3938                                         return;
3939                                 } else {
3940                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3941                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3942                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3943                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3944                                                     vnnmap);
3945                                         return;
3946                                 }
3947                         }
3948                 }
3949         }
3950
3951
3952         /* There must be the same number of lmasters in the vnn map as
3953          * there are active nodes with the lmaster capability...  or
3954          * do a recovery.
3955          */
3956         if (vnnmap->size != rec->num_lmasters) {
3957                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3958                           vnnmap->size, rec->num_lmasters));
3959                 ctdb_set_culprit(rec, ctdb->pnn);
3960                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3961                 return;
3962         }
3963
3964         /* verify that all active nodes in the nodemap also exist in 
3965            the vnnmap.
3966          */
3967         for (j=0; j<nodemap->num; j++) {
3968                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3969                         continue;
3970                 }
3971                 if (nodemap->nodes[j].pnn == pnn) {
3972                         continue;
3973                 }
3974
3975                 for (i=0; i<vnnmap->size; i++) {
3976                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3977                                 break;
3978                         }
3979                 }
3980                 if (i == vnnmap->size) {
3981                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3982                                   nodemap->nodes[j].pnn));
3983                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3984                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3985                         return;
3986                 }
3987         }
3988
3989         
3990         /* verify that all other nodes have the same vnnmap
3991            and are from the same generation
3992          */
3993         for (j=0; j<nodemap->num; j++) {
3994                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3995                         continue;
3996                 }
3997                 if (nodemap->nodes[j].pnn == pnn) {
3998                         continue;
3999                 }
4000
4001                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
4002                                           mem_ctx, &remote_vnnmap);
4003                 if (ret != 0) {
4004                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
4005                                   nodemap->nodes[j].pnn));
4006                         return;
4007                 }
4008
4009                 /* verify the vnnmap generation is the same */
4010                 if (vnnmap->generation != remote_vnnmap->generation) {
4011                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
4012                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
4013                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4014                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4015                         return;
4016                 }
4017
4018                 /* verify the vnnmap size is the same */
4019                 if (vnnmap->size != remote_vnnmap->size) {
4020                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
4021                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
4022                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4023                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4024                         return;
4025                 }
4026
4027                 /* verify the vnnmap is the same */
4028                 for (i=0;i<vnnmap->size;i++) {
4029                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
4030                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
4031                                           nodemap->nodes[j].pnn));
4032                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
4033                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
4034                                             vnnmap);
4035                                 return;
4036                         }
4037                 }
4038         }
4039
4040         /* we might need to change who has what IP assigned */
4041         if (rec->need_takeover_run) {
4042                 uint32_t culprit = (uint32_t)-1;
4043
4044                 rec->need_takeover_run = false;
4045
4046                 /* update the list of public ips that a node can handle for
4047                    all connected nodes
4048                 */
4049                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
4050                 if (ret != 0) {
4051                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
4052                                          culprit));
4053                         rec->need_takeover_run = true;
4054                         return;
4055                 }
4056
4057                 /* execute the "startrecovery" event script on all nodes */
4058                 ret = run_startrecovery_eventscript(rec, nodemap);
4059                 if (ret!=0) {
4060                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
4061                         ctdb_set_culprit(rec, ctdb->pnn);
4062                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4063                         return;
4064                 }
4065
4066                 /* If takeover run fails, then the offending nodes are
4067                  * assigned ban culprit counts. And we re-try takeover.
4068                  * If takeover run fails repeatedly, the node would get
4069                  * banned.
4070                  *
4071                  * If rec->need_takeover_run is not set to true at this
4072                  * failure, monitoring is disabled cluster-wide (via
4073                  * startrecovery eventscript) and will not get enabled.
4074                  */
4075                 if (!do_takeover_run(rec, nodemap, true)) {
4076                         return;
4077                 }
4078
4079                 /* execute the "recovered" event script on all nodes */
4080                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
4081 #if 0
4082 // we cant check whether the event completed successfully
4083 // since this script WILL fail if the node is in recovery mode
4084 // and if that race happens, the code here would just cause a second
4085 // cascading recovery.
4086                 if (ret!=0) {
4087                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
4088                         ctdb_set_culprit(rec, ctdb->pnn);
4089                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
4090                 }
4091 #endif
4092         }
4093 }
4094
4095 /*
4096   the main monitoring loop
4097  */
4098 static void monitor_cluster(struct ctdb_context *ctdb)
4099 {
4100         struct ctdb_recoverd *rec;
4101
4102         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
4103
4104         rec = talloc_zero(ctdb, struct ctdb_recoverd);
4105         CTDB_NO_MEMORY_FATAL(ctdb, rec);
4106
4107         rec->ctdb = ctdb;
4108
4109         rec->takeover_run_in_progress = false;
4110
4111         rec->priority_time = timeval_current();
4112
4113         /* register a message port for sending memory dumps */
4114         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
4115
4116         /* register a message port for requesting logs */
4117         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
4118
4119         /* register a message port for clearing logs */
4120         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
4121
4122         /* register a message port for recovery elections */
4123         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4124
4125         /* when nodes are disabled/enabled */
4126         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4127
4128         /* when we are asked to puch out a flag change */
4129         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4130
4131         /* register a message port for vacuum fetch */
4132         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4133
4134         /* register a message port for reloadnodes  */
4135         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4136
4137         /* register a message port for performing a takeover run */
4138         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4139
4140         /* register a message port for disabling the ip check for a short while */
4141         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4142
4143         /* register a message port for updating the recovery daemons node assignment for an ip */
4144         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4145
4146         /* register a message port for forcing a rebalance of a node next
4147            reallocation */
4148         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4149
4150         /* Register a message port for disabling takeover runs */
4151         ctdb_client_set_message_handler(ctdb,
4152                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4153                                         disable_takeover_runs_handler, rec);
4154
4155         for (;;) {
4156                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4157                 struct timeval start;
4158                 double elapsed;
4159
4160                 if (!mem_ctx) {
4161                         DEBUG(DEBUG_CRIT,(__location__
4162                                           " Failed to create temp context\n"));
4163                         exit(-1);
4164                 }
4165
4166                 start = timeval_current();
4167                 main_loop(ctdb, rec, mem_ctx);
4168                 talloc_free(mem_ctx);
4169
4170                 /* we only check for recovery once every second */
4171                 elapsed = timeval_elapsed(&start);
4172                 if (elapsed < ctdb->tunable.recover_interval) {
4173                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4174                                           - elapsed);
4175                 }
4176         }
4177 }
4178
4179 /*
4180   event handler for when the main ctdbd dies
4181  */
4182 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4183                                  uint16_t flags, void *private_data)
4184 {
4185         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4186         _exit(1);
4187 }
4188
4189 /*
4190   called regularly to verify that the recovery daemon is still running
4191  */
4192 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4193                               struct timeval yt, void *p)
4194 {
4195         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4196
4197         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4198                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4199
4200                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4201                                 ctdb_restart_recd, ctdb);
4202
4203                 return;
4204         }
4205
4206         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4207                         timeval_current_ofs(30, 0),
4208                         ctdb_check_recd, ctdb);
4209 }
4210
4211 static void recd_sig_child_handler(struct event_context *ev,
4212         struct signal_event *se, int signum, int count,
4213         void *dont_care, 
4214         void *private_data)
4215 {
4216 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4217         int status;
4218         pid_t pid = -1;
4219
4220         while (pid != 0) {
4221                 pid = waitpid(-1, &status, WNOHANG);
4222                 if (pid == -1) {
4223                         if (errno != ECHILD) {
4224                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4225                         }
4226                         return;
4227                 }
4228                 if (pid > 0) {
4229                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4230                 }
4231         }
4232 }
4233
4234 /*
4235   startup the recovery daemon as a child of the main ctdb daemon
4236  */
4237 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4238 {
4239         int fd[2];
4240         struct signal_event *se;
4241         struct tevent_fd *fde;
4242
4243         if (pipe(fd) != 0) {
4244                 return -1;
4245         }
4246
4247         ctdb->ctdbd_pid = getpid();
4248
4249         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4250         if (ctdb->recoverd_pid == -1) {
4251                 return -1;
4252         }
4253
4254         if (ctdb->recoverd_pid != 0) {
4255                 talloc_free(ctdb->recd_ctx);
4256                 ctdb->recd_ctx = talloc_new(ctdb);
4257                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4258
4259                 close(fd[0]);
4260                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4261                                 timeval_current_ofs(30, 0),
4262                                 ctdb_check_recd, ctdb);
4263                 return 0;
4264         }
4265
4266         close(fd[1]);
4267
4268         srandom(getpid() ^ time(NULL));
4269
4270         /* Clear the log ringbuffer */
4271         ctdb_clear_log(ctdb);
4272
4273         ctdb_set_process_name("ctdb_recovered");
4274         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4275                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4276                 exit(1);
4277         }
4278
4279         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4280
4281         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4282                      ctdb_recoverd_parent, &fd[0]);
4283         tevent_fd_set_auto_close(fde);
4284
4285         /* set up a handler to pick up sigchld */
4286         se = event_add_signal(ctdb->ev, ctdb,
4287                                      SIGCHLD, 0,
4288                                      recd_sig_child_handler,
4289                                      ctdb);
4290         if (se == NULL) {
4291                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4292                 exit(1);
4293         }
4294
4295         monitor_cluster(ctdb);
4296
4297         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4298         return -1;
4299 }
4300
4301 /*
4302   shutdown the recovery daemon
4303  */
4304 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4305 {
4306         if (ctdb->recoverd_pid == 0) {
4307                 return;
4308         }
4309
4310         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4311         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4312
4313         TALLOC_FREE(ctdb->recd_ctx);
4314         TALLOC_FREE(ctdb->recd_ping_count);
4315 }
4316
4317 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4318                        struct timeval t, void *private_data)
4319 {
4320         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4321
4322         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4323         ctdb_stop_recoverd(ctdb);
4324         ctdb_start_recoverd(ctdb);
4325 }