793ef1f80ff11c7f0c356bdcf3a5a734de16cc6c
[ctdb.git] / server / ctdb_recoverd.c
1 /* 
2    ctdb recovery daemon
3
4    Copyright (C) Ronnie Sahlberg  2007
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "system/filesys.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "system/wait.h"
25 #include "popt.h"
26 #include "cmdline.h"
27 #include "../include/ctdb_client.h"
28 #include "../include/ctdb_private.h"
29 #include "db_wrap.h"
30 #include "lib/util/dlinklist.h"
31
32
33 /* List of SRVID requests that need to be processed */
34 struct srvid_list {
35         struct srvid_list *next, *prev;
36         struct srvid_request *request;
37 };
38
39 struct srvid_requests {
40         struct srvid_list *requests;
41 };
42
43 static void srvid_request_reply(struct ctdb_context *ctdb,
44                                 struct srvid_request *request,
45                                 TDB_DATA result)
46 {
47         /* Someone that sent srvid==0 does not want a reply */
48         if (request->srvid == 0) {
49                 talloc_free(request);
50                 return;
51         }
52
53         if (ctdb_client_send_message(ctdb, request->pnn, request->srvid,
54                                      result) == 0) {
55                 DEBUG(DEBUG_INFO,("Sent SRVID reply to %u:%llu\n",
56                                   (unsigned)request->pnn,
57                                   (unsigned long long)request->srvid));
58         } else {
59                 DEBUG(DEBUG_ERR,("Failed to send SRVID reply to %u:%llu\n",
60                                  (unsigned)request->pnn,
61                                  (unsigned long long)request->srvid));
62         }
63
64         talloc_free(request);
65 }
66
67 static void srvid_requests_reply(struct ctdb_context *ctdb,
68                                  struct srvid_requests **requests,
69                                  TDB_DATA result)
70 {
71         struct srvid_list *r;
72
73         for (r = (*requests)->requests; r != NULL; r = r->next) {
74                 srvid_request_reply(ctdb, r->request, result);
75         }
76
77         /* Free the list structure... */
78         TALLOC_FREE(*requests);
79 }
80
81 static void srvid_request_add(struct ctdb_context *ctdb,
82                               struct srvid_requests **requests,
83                               struct srvid_request *request)
84 {
85         struct srvid_list *t;
86         int32_t ret;
87         TDB_DATA result;
88
89         if (*requests == NULL) {
90                 *requests = talloc_zero(ctdb, struct srvid_requests);
91                 if (*requests == NULL) {
92                         goto nomem;
93                 }
94         }
95
96         t = talloc_zero(*requests, struct srvid_list);
97         if (t == NULL) {
98                 /* If *requests was just allocated above then free it */
99                 if ((*requests)->requests == NULL) {
100                         TALLOC_FREE(*requests);
101                 }
102                 goto nomem;
103         }
104
105         t->request = (struct srvid_request *)talloc_steal(t, request);
106         DLIST_ADD((*requests)->requests, t);
107
108         return;
109
110 nomem:
111         /* Failed to add the request to the list.  Send a fail. */
112         DEBUG(DEBUG_ERR, (__location__
113                           " Out of memory, failed to queue SRVID request\n"));
114         ret = -ENOMEM;
115         result.dsize = sizeof(ret);
116         result.dptr = (uint8_t *)&ret;
117         srvid_request_reply(ctdb, request, result);
118 }
119
120 struct ctdb_banning_state {
121         uint32_t count;
122         struct timeval last_reported_time;
123 };
124
125 /*
126   private state of recovery daemon
127  */
128 struct ctdb_recoverd {
129         struct ctdb_context *ctdb;
130         uint32_t recmaster;
131         uint32_t num_active;
132         uint32_t num_lmasters;
133         uint32_t num_connected;
134         uint32_t last_culprit_node;
135         struct ctdb_node_map *nodemap;
136         struct timeval priority_time;
137         bool need_takeover_run;
138         bool need_recovery;
139         uint32_t node_flags;
140         struct timed_event *send_election_te;
141         struct timed_event *election_timeout;
142         struct vacuum_info *vacuum_info;
143         struct srvid_requests *reallocate_requests;
144         bool takeover_run_in_progress;
145         TALLOC_CTX *takeover_runs_disable_ctx;
146         struct ctdb_control_get_ifaces *ifaces;
147         uint32_t *force_rebalance_nodes;
148 };
149
150 #define CONTROL_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_timeout, 0)
151 #define MONITOR_TIMEOUT() timeval_current_ofs(ctdb->tunable.recover_interval, 0)
152
153 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data);
154
155 /*
156   ban a node for a period of time
157  */
158 static void ctdb_ban_node(struct ctdb_recoverd *rec, uint32_t pnn, uint32_t ban_time)
159 {
160         int ret;
161         struct ctdb_context *ctdb = rec->ctdb;
162         struct ctdb_ban_time bantime;
163        
164         if (!ctdb_validate_pnn(ctdb, pnn)) {
165                 DEBUG(DEBUG_ERR,("Bad pnn %u in ctdb_ban_node\n", pnn));
166                 return;
167         }
168
169         DEBUG(DEBUG_NOTICE,("Banning node %u for %u seconds\n", pnn, ban_time));
170
171         bantime.pnn  = pnn;
172         bantime.time = ban_time;
173
174         ret = ctdb_ctrl_set_ban(ctdb, CONTROL_TIMEOUT(), pnn, &bantime);
175         if (ret != 0) {
176                 DEBUG(DEBUG_ERR,(__location__ " Failed to ban node %d\n", pnn));
177                 return;
178         }
179
180 }
181
182 enum monitor_result { MONITOR_OK, MONITOR_RECOVERY_NEEDED, MONITOR_ELECTION_NEEDED, MONITOR_FAILED};
183
184
185 /*
186   remember the trouble maker
187  */
188 static void ctdb_set_culprit_count(struct ctdb_recoverd *rec, uint32_t culprit, uint32_t count)
189 {
190         struct ctdb_context *ctdb = talloc_get_type(rec->ctdb, struct ctdb_context);
191         struct ctdb_banning_state *ban_state;
192
193         if (culprit > ctdb->num_nodes) {
194                 DEBUG(DEBUG_ERR,("Trying to set culprit %d but num_nodes is %d\n", culprit, ctdb->num_nodes));
195                 return;
196         }
197
198         /* If we are banned or stopped, do not set other nodes as culprits */
199         if (rec->node_flags & NODE_FLAGS_INACTIVE) {
200                 DEBUG(DEBUG_NOTICE, ("This node is INACTIVE, cannot set culprit node %d\n", culprit));
201                 return;
202         }
203
204         if (ctdb->nodes[culprit]->ban_state == NULL) {
205                 ctdb->nodes[culprit]->ban_state = talloc_zero(ctdb->nodes[culprit], struct ctdb_banning_state);
206                 CTDB_NO_MEMORY_VOID(ctdb, ctdb->nodes[culprit]->ban_state);
207
208                 
209         }
210         ban_state = ctdb->nodes[culprit]->ban_state;
211         if (timeval_elapsed(&ban_state->last_reported_time) > ctdb->tunable.recovery_grace_period) {
212                 /* this was the first time in a long while this node
213                    misbehaved so we will forgive any old transgressions.
214                 */
215                 ban_state->count = 0;
216         }
217
218         ban_state->count += count;
219         ban_state->last_reported_time = timeval_current();
220         rec->last_culprit_node = culprit;
221 }
222
223 /*
224   remember the trouble maker
225  */
226 static void ctdb_set_culprit(struct ctdb_recoverd *rec, uint32_t culprit)
227 {
228         ctdb_set_culprit_count(rec, culprit, 1);
229 }
230
231
232 /* this callback is called for every node that failed to execute the
233    recovered event
234 */
235 static void recovered_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
236 {
237         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
238
239         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the recovered event. Setting it as recovery fail culprit\n", node_pnn));
240
241         ctdb_set_culprit(rec, node_pnn);
242 }
243
244 /*
245   run the "recovered" eventscript on all nodes
246  */
247 static int run_recovered_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, const char *caller)
248 {
249         TALLOC_CTX *tmp_ctx;
250         uint32_t *nodes;
251         struct ctdb_context *ctdb = rec->ctdb;
252
253         tmp_ctx = talloc_new(ctdb);
254         CTDB_NO_MEMORY(ctdb, tmp_ctx);
255
256         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
257         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_END_RECOVERY,
258                                         nodes, 0,
259                                         CONTROL_TIMEOUT(), false, tdb_null,
260                                         NULL, recovered_fail_callback,
261                                         rec) != 0) {
262                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event when called from %s\n", caller));
263
264                 talloc_free(tmp_ctx);
265                 return -1;
266         }
267
268         talloc_free(tmp_ctx);
269         return 0;
270 }
271
272 /* this callback is called for every node that failed to execute the
273    start recovery event
274 */
275 static void startrecovery_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
276 {
277         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
278
279         DEBUG(DEBUG_ERR, (__location__ " Node %u failed the startrecovery event. Setting it as recovery fail culprit\n", node_pnn));
280
281         ctdb_set_culprit(rec, node_pnn);
282 }
283
284 /*
285   run the "startrecovery" eventscript on all nodes
286  */
287 static int run_startrecovery_eventscript(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
288 {
289         TALLOC_CTX *tmp_ctx;
290         uint32_t *nodes;
291         struct ctdb_context *ctdb = rec->ctdb;
292
293         tmp_ctx = talloc_new(ctdb);
294         CTDB_NO_MEMORY(ctdb, tmp_ctx);
295
296         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
297         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_START_RECOVERY,
298                                         nodes, 0,
299                                         CONTROL_TIMEOUT(), false, tdb_null,
300                                         NULL,
301                                         startrecovery_fail_callback,
302                                         rec) != 0) {
303                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event. Recovery failed.\n"));
304                 talloc_free(tmp_ctx);
305                 return -1;
306         }
307
308         talloc_free(tmp_ctx);
309         return 0;
310 }
311
312 static void async_getcap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
313 {
314         if ( (outdata.dsize != sizeof(uint32_t)) || (outdata.dptr == NULL) ) {
315                 DEBUG(DEBUG_ERR, (__location__ " Invalid length/pointer for getcap callback : %u %p\n",  (unsigned)outdata.dsize, outdata.dptr));
316                 return;
317         }
318         if (node_pnn < ctdb->num_nodes) {
319                 ctdb->nodes[node_pnn]->capabilities = *((uint32_t *)outdata.dptr);
320         }
321
322         if (node_pnn == ctdb->pnn) {
323                 ctdb->capabilities = ctdb->nodes[node_pnn]->capabilities;
324         }
325 }
326
327 /*
328   update the node capabilities for all connected nodes
329  */
330 static int update_capabilities(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
331 {
332         uint32_t *nodes;
333         TALLOC_CTX *tmp_ctx;
334
335         tmp_ctx = talloc_new(ctdb);
336         CTDB_NO_MEMORY(ctdb, tmp_ctx);
337
338         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
339         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_CAPABILITIES,
340                                         nodes, 0,
341                                         CONTROL_TIMEOUT(),
342                                         false, tdb_null,
343                                         async_getcap_callback, NULL,
344                                         NULL) != 0) {
345                 DEBUG(DEBUG_ERR, (__location__ " Failed to read node capabilities.\n"));
346                 talloc_free(tmp_ctx);
347                 return -1;
348         }
349
350         talloc_free(tmp_ctx);
351         return 0;
352 }
353
354 static void set_recmode_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
355 {
356         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
357
358         DEBUG(DEBUG_ERR,("Failed to freeze node %u during recovery. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
359         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
360 }
361
362 static void transaction_start_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
363 {
364         struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
365
366         DEBUG(DEBUG_ERR,("Failed to start recovery transaction on node %u. Set it as ban culprit for %d credits\n", node_pnn, rec->nodemap->num));
367         ctdb_set_culprit_count(rec, node_pnn, rec->nodemap->num);
368 }
369
370 /*
371   change recovery mode on all nodes
372  */
373 static int set_recovery_mode(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t rec_mode)
374 {
375         TDB_DATA data;
376         uint32_t *nodes;
377         TALLOC_CTX *tmp_ctx;
378
379         tmp_ctx = talloc_new(ctdb);
380         CTDB_NO_MEMORY(ctdb, tmp_ctx);
381
382         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
383
384         data.dsize = sizeof(uint32_t);
385         data.dptr = (unsigned char *)&rec_mode;
386
387         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMODE,
388                                         nodes, 0,
389                                         CONTROL_TIMEOUT(),
390                                         false, data,
391                                         NULL, NULL,
392                                         NULL) != 0) {
393                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode. Recovery failed.\n"));
394                 talloc_free(tmp_ctx);
395                 return -1;
396         }
397
398         /* freeze all nodes */
399         if (rec_mode == CTDB_RECOVERY_ACTIVE) {
400                 int i;
401
402                 for (i=1; i<=NUM_DB_PRIORITIES; i++) {
403                         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_FREEZE,
404                                                 nodes, i,
405                                                 CONTROL_TIMEOUT(),
406                                                 false, tdb_null,
407                                                 NULL,
408                                                 set_recmode_fail_callback,
409                                                 rec) != 0) {
410                                 DEBUG(DEBUG_ERR, (__location__ " Unable to freeze nodes. Recovery failed.\n"));
411                                 talloc_free(tmp_ctx);
412                                 return -1;
413                         }
414                 }
415         }
416
417         talloc_free(tmp_ctx);
418         return 0;
419 }
420
421 /*
422   change recovery master on all node
423  */
424 static int set_recovery_master(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn)
425 {
426         TDB_DATA data;
427         TALLOC_CTX *tmp_ctx;
428         uint32_t *nodes;
429
430         tmp_ctx = talloc_new(ctdb);
431         CTDB_NO_MEMORY(ctdb, tmp_ctx);
432
433         data.dsize = sizeof(uint32_t);
434         data.dptr = (unsigned char *)&pnn;
435
436         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
437         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECMASTER,
438                                         nodes, 0,
439                                         CONTROL_TIMEOUT(), false, data,
440                                         NULL, NULL,
441                                         NULL) != 0) {
442                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recmaster. Recovery failed.\n"));
443                 talloc_free(tmp_ctx);
444                 return -1;
445         }
446
447         talloc_free(tmp_ctx);
448         return 0;
449 }
450
451 /* update all remote nodes to use the same db priority that we have
452    this can fail if the remove node has not yet been upgraded to 
453    support this function, so we always return success and never fail
454    a recovery if this call fails.
455 */
456 static int update_db_priority_on_remote_nodes(struct ctdb_context *ctdb,
457         struct ctdb_node_map *nodemap, 
458         uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
459 {
460         int db;
461
462         /* step through all local databases */
463         for (db=0; db<dbmap->num;db++) {
464                 struct ctdb_db_priority db_prio;
465                 int ret;
466
467                 db_prio.db_id     = dbmap->dbs[db].dbid;
468                 ret = ctdb_ctrl_get_db_priority(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, dbmap->dbs[db].dbid, &db_prio.priority);
469                 if (ret != 0) {
470                         DEBUG(DEBUG_ERR,(__location__ " Failed to read database priority from local node for db 0x%08x\n", dbmap->dbs[db].dbid));
471                         continue;
472                 }
473
474                 DEBUG(DEBUG_INFO,("Update DB priority for db 0x%08x to %u\n", dbmap->dbs[db].dbid, db_prio.priority)); 
475
476                 ret = ctdb_ctrl_set_db_priority(ctdb, CONTROL_TIMEOUT(),
477                                                 CTDB_CURRENT_NODE, &db_prio);
478                 if (ret != 0) {
479                         DEBUG(DEBUG_ERR,(__location__ " Failed to set DB priority for 0x%08x\n",
480                                          db_prio.db_id));
481                 }
482         }
483
484         return 0;
485 }                       
486
487 /*
488   ensure all other nodes have attached to any databases that we have
489  */
490 static int create_missing_remote_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
491                                            uint32_t pnn, struct ctdb_dbid_map *dbmap, TALLOC_CTX *mem_ctx)
492 {
493         int i, j, db, ret;
494         struct ctdb_dbid_map *remote_dbmap;
495
496         /* verify that all other nodes have all our databases */
497         for (j=0; j<nodemap->num; j++) {
498                 /* we dont need to ourself ourselves */
499                 if (nodemap->nodes[j].pnn == pnn) {
500                         continue;
501                 }
502                 /* dont check nodes that are unavailable */
503                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
504                         continue;
505                 }
506
507                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
508                                          mem_ctx, &remote_dbmap);
509                 if (ret != 0) {
510                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
511                         return -1;
512                 }
513
514                 /* step through all local databases */
515                 for (db=0; db<dbmap->num;db++) {
516                         const char *name;
517
518
519                         for (i=0;i<remote_dbmap->num;i++) {
520                                 if (dbmap->dbs[db].dbid == remote_dbmap->dbs[i].dbid) {
521                                         break;
522                                 }
523                         }
524                         /* the remote node already have this database */
525                         if (i!=remote_dbmap->num) {
526                                 continue;
527                         }
528                         /* ok so we need to create this database */
529                         ret = ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), pnn,
530                                                   dbmap->dbs[db].dbid, mem_ctx,
531                                                   &name);
532                         if (ret != 0) {
533                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", pnn));
534                                 return -1;
535                         }
536                         ret = ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(),
537                                                  nodemap->nodes[j].pnn,
538                                                  mem_ctx, name,
539                                                  dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
540                         if (ret != 0) {
541                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create remote db:%s\n", name));
542                                 return -1;
543                         }
544                 }
545         }
546
547         return 0;
548 }
549
550
551 /*
552   ensure we are attached to any databases that anyone else is attached to
553  */
554 static int create_missing_local_databases(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
555                                           uint32_t pnn, struct ctdb_dbid_map **dbmap, TALLOC_CTX *mem_ctx)
556 {
557         int i, j, db, ret;
558         struct ctdb_dbid_map *remote_dbmap;
559
560         /* verify that we have all database any other node has */
561         for (j=0; j<nodemap->num; j++) {
562                 /* we dont need to ourself ourselves */
563                 if (nodemap->nodes[j].pnn == pnn) {
564                         continue;
565                 }
566                 /* dont check nodes that are unavailable */
567                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
568                         continue;
569                 }
570
571                 ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
572                                          mem_ctx, &remote_dbmap);
573                 if (ret != 0) {
574                         DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node %u\n", pnn));
575                         return -1;
576                 }
577
578                 /* step through all databases on the remote node */
579                 for (db=0; db<remote_dbmap->num;db++) {
580                         const char *name;
581
582                         for (i=0;i<(*dbmap)->num;i++) {
583                                 if (remote_dbmap->dbs[db].dbid == (*dbmap)->dbs[i].dbid) {
584                                         break;
585                                 }
586                         }
587                         /* we already have this db locally */
588                         if (i!=(*dbmap)->num) {
589                                 continue;
590                         }
591                         /* ok so we need to create this database and
592                            rebuild dbmap
593                          */
594                         ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
595                                             remote_dbmap->dbs[db].dbid, mem_ctx, &name);
596                         if (ret != 0) {
597                                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbname from node %u\n", 
598                                           nodemap->nodes[j].pnn));
599                                 return -1;
600                         }
601                         ctdb_ctrl_createdb(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, name, 
602                                            remote_dbmap->dbs[db].flags & CTDB_DB_FLAGS_PERSISTENT);
603                         if (ret != 0) {
604                                 DEBUG(DEBUG_ERR, (__location__ " Unable to create local db:%s\n", name));
605                                 return -1;
606                         }
607                         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, dbmap);
608                         if (ret != 0) {
609                                 DEBUG(DEBUG_ERR, (__location__ " Unable to reread dbmap on node %u\n", pnn));
610                                 return -1;
611                         }
612                 }
613         }
614
615         return 0;
616 }
617
618
619 /*
620   pull the remote database contents from one node into the recdb
621  */
622 static int pull_one_remote_database(struct ctdb_context *ctdb, uint32_t srcnode, 
623                                     struct tdb_wrap *recdb, uint32_t dbid)
624 {
625         int ret;
626         TDB_DATA outdata;
627         struct ctdb_marshall_buffer *reply;
628         struct ctdb_rec_data *rec;
629         int i;
630         TALLOC_CTX *tmp_ctx = talloc_new(recdb);
631
632         ret = ctdb_ctrl_pulldb(ctdb, srcnode, dbid, CTDB_LMASTER_ANY, tmp_ctx,
633                                CONTROL_TIMEOUT(), &outdata);
634         if (ret != 0) {
635                 DEBUG(DEBUG_ERR,(__location__ " Unable to copy db from node %u\n", srcnode));
636                 talloc_free(tmp_ctx);
637                 return -1;
638         }
639
640         reply = (struct ctdb_marshall_buffer *)outdata.dptr;
641
642         if (outdata.dsize < offsetof(struct ctdb_marshall_buffer, data)) {
643                 DEBUG(DEBUG_ERR,(__location__ " invalid data in pulldb reply\n"));
644                 talloc_free(tmp_ctx);
645                 return -1;
646         }
647         
648         rec = (struct ctdb_rec_data *)&reply->data[0];
649         
650         for (i=0;
651              i<reply->count;
652              rec = (struct ctdb_rec_data *)(rec->length + (uint8_t *)rec), i++) {
653                 TDB_DATA key, data;
654                 struct ctdb_ltdb_header *hdr;
655                 TDB_DATA existing;
656                 
657                 key.dptr = &rec->data[0];
658                 key.dsize = rec->keylen;
659                 data.dptr = &rec->data[key.dsize];
660                 data.dsize = rec->datalen;
661                 
662                 hdr = (struct ctdb_ltdb_header *)data.dptr;
663
664                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
665                         DEBUG(DEBUG_CRIT,(__location__ " bad ltdb record\n"));
666                         talloc_free(tmp_ctx);
667                         return -1;
668                 }
669
670                 /* fetch the existing record, if any */
671                 existing = tdb_fetch(recdb->tdb, key);
672                 
673                 if (existing.dptr != NULL) {
674                         struct ctdb_ltdb_header header;
675                         if (existing.dsize < sizeof(struct ctdb_ltdb_header)) {
676                                 DEBUG(DEBUG_CRIT,(__location__ " Bad record size %u from node %u\n", 
677                                          (unsigned)existing.dsize, srcnode));
678                                 free(existing.dptr);
679                                 talloc_free(tmp_ctx);
680                                 return -1;
681                         }
682                         header = *(struct ctdb_ltdb_header *)existing.dptr;
683                         free(existing.dptr);
684                         if (!(header.rsn < hdr->rsn ||
685                               (header.dmaster != ctdb->recovery_master && header.rsn == hdr->rsn))) {
686                                 continue;
687                         }
688                 }
689                 
690                 if (tdb_store(recdb->tdb, key, data, TDB_REPLACE) != 0) {
691                         DEBUG(DEBUG_CRIT,(__location__ " Failed to store record\n"));
692                         talloc_free(tmp_ctx);
693                         return -1;                              
694                 }
695         }
696
697         talloc_free(tmp_ctx);
698
699         return 0;
700 }
701
702
703 struct pull_seqnum_cbdata {
704         int failed;
705         uint32_t pnn;
706         uint64_t seqnum;
707 };
708
709 static void pull_seqnum_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
710 {
711         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
712         uint64_t seqnum;
713
714         if (cb_data->failed != 0) {
715                 DEBUG(DEBUG_ERR, ("Got seqnum from node %d but we have already failed the entire operation\n", node_pnn));
716                 return;
717         }
718
719         if (res != 0) {
720                 DEBUG(DEBUG_ERR, ("Error when pulling seqnum from node %d\n", node_pnn));
721                 cb_data->failed = 1;
722                 return;
723         }
724
725         if (outdata.dsize != sizeof(uint64_t)) {
726                 DEBUG(DEBUG_ERR, ("Error when reading pull seqnum from node %d, got %d bytes but expected %d\n", node_pnn, (int)outdata.dsize, (int)sizeof(uint64_t)));
727                 cb_data->failed = -1;
728                 return;
729         }
730
731         seqnum = *((uint64_t *)outdata.dptr);
732
733         if (seqnum > cb_data->seqnum ||
734             (cb_data->pnn == -1 && seqnum == 0)) {
735                 cb_data->seqnum = seqnum;
736                 cb_data->pnn = node_pnn;
737         }
738 }
739
740 static void pull_seqnum_fail_cb(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
741 {
742         struct pull_seqnum_cbdata *cb_data = talloc_get_type(callback_data, struct pull_seqnum_cbdata);
743
744         DEBUG(DEBUG_ERR, ("Failed to pull db seqnum from node %d\n", node_pnn));
745         cb_data->failed = 1;
746 }
747
748 static int pull_highest_seqnum_pdb(struct ctdb_context *ctdb,
749                                 struct ctdb_recoverd *rec, 
750                                 struct ctdb_node_map *nodemap, 
751                                 struct tdb_wrap *recdb, uint32_t dbid)
752 {
753         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
754         uint32_t *nodes;
755         TDB_DATA data;
756         uint32_t outdata[2];
757         struct pull_seqnum_cbdata *cb_data;
758
759         DEBUG(DEBUG_NOTICE, ("Scan for highest seqnum pdb for db:0x%08x\n", dbid));
760
761         outdata[0] = dbid;
762         outdata[1] = 0;
763
764         data.dsize = sizeof(outdata);
765         data.dptr  = (uint8_t *)&outdata[0];
766
767         cb_data = talloc(tmp_ctx, struct pull_seqnum_cbdata);
768         if (cb_data == NULL) {
769                 DEBUG(DEBUG_ERR, ("Failed to allocate pull highest seqnum cb_data structure\n"));
770                 talloc_free(tmp_ctx);
771                 return -1;
772         }
773
774         cb_data->failed = 0;
775         cb_data->pnn    = -1;
776         cb_data->seqnum = 0;
777         
778         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
779         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_DB_SEQNUM,
780                                         nodes, 0,
781                                         CONTROL_TIMEOUT(), false, data,
782                                         pull_seqnum_cb,
783                                         pull_seqnum_fail_cb,
784                                         cb_data) != 0) {
785                 DEBUG(DEBUG_ERR, (__location__ " Failed to run async GET_DB_SEQNUM\n"));
786
787                 talloc_free(tmp_ctx);
788                 return -1;
789         }
790
791         if (cb_data->failed != 0) {
792                 DEBUG(DEBUG_NOTICE, ("Failed to pull sequence numbers for DB 0x%08x\n", dbid));
793                 talloc_free(tmp_ctx);
794                 return -1;
795         }
796
797         if (cb_data->pnn == -1) {
798                 DEBUG(DEBUG_NOTICE, ("Failed to find a node with highest sequence numbers for DB 0x%08x\n", dbid));
799                 talloc_free(tmp_ctx);
800                 return -1;
801         }
802
803         DEBUG(DEBUG_NOTICE, ("Pull persistent db:0x%08x from node %d with highest seqnum:%lld\n", dbid, cb_data->pnn, (long long)cb_data->seqnum)); 
804
805         if (pull_one_remote_database(ctdb, cb_data->pnn, recdb, dbid) != 0) {
806                 DEBUG(DEBUG_ERR, ("Failed to pull higest seqnum database 0x%08x from node %d\n", dbid, cb_data->pnn));
807                 talloc_free(tmp_ctx);
808                 return -1;
809         }
810
811         talloc_free(tmp_ctx);
812         return 0;
813 }
814
815
816 /*
817   pull all the remote database contents into the recdb
818  */
819 static int pull_remote_database(struct ctdb_context *ctdb,
820                                 struct ctdb_recoverd *rec, 
821                                 struct ctdb_node_map *nodemap, 
822                                 struct tdb_wrap *recdb, uint32_t dbid,
823                                 bool persistent)
824 {
825         int j;
826
827         if (persistent && ctdb->tunable.recover_pdb_by_seqnum != 0) {
828                 int ret;
829                 ret = pull_highest_seqnum_pdb(ctdb, rec, nodemap, recdb, dbid);
830                 if (ret == 0) {
831                         return 0;
832                 }
833         }
834
835         /* pull all records from all other nodes across onto this node
836            (this merges based on rsn)
837         */
838         for (j=0; j<nodemap->num; j++) {
839                 /* dont merge from nodes that are unavailable */
840                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
841                         continue;
842                 }
843                 if (pull_one_remote_database(ctdb, nodemap->nodes[j].pnn, recdb, dbid) != 0) {
844                         DEBUG(DEBUG_ERR,(__location__ " Failed to pull remote database from node %u\n", 
845                                  nodemap->nodes[j].pnn));
846                         ctdb_set_culprit_count(rec, nodemap->nodes[j].pnn, nodemap->num);
847                         return -1;
848                 }
849         }
850         
851         return 0;
852 }
853
854
855 /*
856   update flags on all active nodes
857  */
858 static int update_flags_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, uint32_t pnn, uint32_t flags)
859 {
860         int ret;
861
862         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), pnn, flags, ~flags);
863                 if (ret != 0) {
864                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
865                 return -1;
866         }
867
868         return 0;
869 }
870
871 /*
872   ensure all nodes have the same vnnmap we do
873  */
874 static int update_vnnmap_on_all_nodes(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap, 
875                                       uint32_t pnn, struct ctdb_vnn_map *vnnmap, TALLOC_CTX *mem_ctx)
876 {
877         int j, ret;
878
879         /* push the new vnn map out to all the nodes */
880         for (j=0; j<nodemap->num; j++) {
881                 /* dont push to nodes that are unavailable */
882                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
883                         continue;
884                 }
885
886                 ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, mem_ctx, vnnmap);
887                 if (ret != 0) {
888                         DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
889                         return -1;
890                 }
891         }
892
893         return 0;
894 }
895
896
897 struct vacuum_info {
898         struct vacuum_info *next, *prev;
899         struct ctdb_recoverd *rec;
900         uint32_t srcnode;
901         struct ctdb_db_context *ctdb_db;
902         struct ctdb_marshall_buffer *recs;
903         struct ctdb_rec_data *r;
904 };
905
906 static void vacuum_fetch_next(struct vacuum_info *v);
907
908 /*
909   called when a vacuum fetch has completed - just free it and do the next one
910  */
911 static void vacuum_fetch_callback(struct ctdb_client_call_state *state)
912 {
913         talloc_free(state);
914 }
915
916
917 /*
918   process the next element from the vacuum list
919 */
920 static void vacuum_fetch_next(struct vacuum_info *v)
921 {
922         struct ctdb_call call;
923         struct ctdb_rec_data *r;
924
925         while (v->recs->count) {
926                 struct ctdb_client_call_state *state;
927                 TDB_DATA data;
928                 struct ctdb_ltdb_header *hdr;
929
930                 ZERO_STRUCT(call);
931                 call.call_id = CTDB_NULL_FUNC;
932                 call.flags = CTDB_IMMEDIATE_MIGRATION;
933                 call.flags |= CTDB_CALL_FLAG_VACUUM_MIGRATION;
934
935                 r = v->r;
936                 v->r = (struct ctdb_rec_data *)(r->length + (uint8_t *)r);
937                 v->recs->count--;
938
939                 call.key.dptr = &r->data[0];
940                 call.key.dsize = r->keylen;
941
942                 /* ensure we don't block this daemon - just skip a record if we can't get
943                    the chainlock */
944                 if (tdb_chainlock_nonblock(v->ctdb_db->ltdb->tdb, call.key) != 0) {
945                         continue;
946                 }
947
948                 data = tdb_fetch(v->ctdb_db->ltdb->tdb, call.key);
949                 if (data.dptr == NULL) {
950                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
951                         continue;
952                 }
953
954                 if (data.dsize < sizeof(struct ctdb_ltdb_header)) {
955                         free(data.dptr);
956                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
957                         continue;
958                 }
959                 
960                 hdr = (struct ctdb_ltdb_header *)data.dptr;
961                 if (hdr->dmaster == v->rec->ctdb->pnn) {
962                         /* its already local */
963                         free(data.dptr);
964                         tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
965                         continue;
966                 }
967
968                 free(data.dptr);
969
970                 state = ctdb_call_send(v->ctdb_db, &call);
971                 tdb_chainunlock(v->ctdb_db->ltdb->tdb, call.key);
972                 if (state == NULL) {
973                         DEBUG(DEBUG_ERR,(__location__ " Failed to setup vacuum fetch call\n"));
974                         talloc_free(v);
975                         return;
976                 }
977                 state->async.fn = vacuum_fetch_callback;
978                 state->async.private_data = NULL;
979         }
980
981         talloc_free(v);
982 }
983
984
985 /*
986   destroy a vacuum info structure
987  */
988 static int vacuum_info_destructor(struct vacuum_info *v)
989 {
990         DLIST_REMOVE(v->rec->vacuum_info, v);
991         return 0;
992 }
993
994
995 /*
996   handler for vacuum fetch
997 */
998 static void vacuum_fetch_handler(struct ctdb_context *ctdb, uint64_t srvid, 
999                                  TDB_DATA data, void *private_data)
1000 {
1001         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
1002         struct ctdb_marshall_buffer *recs;
1003         int ret, i;
1004         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
1005         const char *name;
1006         struct ctdb_dbid_map *dbmap=NULL;
1007         bool persistent = false;
1008         struct ctdb_db_context *ctdb_db;
1009         struct ctdb_rec_data *r;
1010         uint32_t srcnode;
1011         struct vacuum_info *v;
1012
1013         recs = (struct ctdb_marshall_buffer *)data.dptr;
1014         r = (struct ctdb_rec_data *)&recs->data[0];
1015
1016         if (recs->count == 0) {
1017                 talloc_free(tmp_ctx);
1018                 return;
1019         }
1020
1021         srcnode = r->reqid;
1022
1023         for (v=rec->vacuum_info;v;v=v->next) {
1024                 if (srcnode == v->srcnode && recs->db_id == v->ctdb_db->db_id) {
1025                         /* we're already working on records from this node */
1026                         talloc_free(tmp_ctx);
1027                         return;
1028                 }
1029         }
1030
1031         /* work out if the database is persistent */
1032         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &dbmap);
1033         if (ret != 0) {
1034                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from local node\n"));
1035                 talloc_free(tmp_ctx);
1036                 return;
1037         }
1038
1039         for (i=0;i<dbmap->num;i++) {
1040                 if (dbmap->dbs[i].dbid == recs->db_id) {
1041                         persistent = dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT;
1042                         break;
1043                 }
1044         }
1045         if (i == dbmap->num) {
1046                 DEBUG(DEBUG_ERR, (__location__ " Unable to find db_id 0x%x on local node\n", recs->db_id));
1047                 talloc_free(tmp_ctx);
1048                 return;         
1049         }
1050
1051         /* find the name of this database */
1052         if (ctdb_ctrl_getdbname(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, recs->db_id, tmp_ctx, &name) != 0) {
1053                 DEBUG(DEBUG_ERR,(__location__ " Failed to get name of db 0x%x\n", recs->db_id));
1054                 talloc_free(tmp_ctx);
1055                 return;
1056         }
1057
1058         /* attach to it */
1059         ctdb_db = ctdb_attach(ctdb, CONTROL_TIMEOUT(), name, persistent, 0);
1060         if (ctdb_db == NULL) {
1061                 DEBUG(DEBUG_ERR,(__location__ " Failed to attach to database '%s'\n", name));
1062                 talloc_free(tmp_ctx);
1063                 return;
1064         }
1065
1066         v = talloc_zero(rec, struct vacuum_info);
1067         if (v == NULL) {
1068                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1069                 talloc_free(tmp_ctx);
1070                 return;
1071         }
1072
1073         v->rec = rec;
1074         v->srcnode = srcnode;
1075         v->ctdb_db = ctdb_db;
1076         v->recs = talloc_memdup(v, recs, data.dsize);
1077         if (v->recs == NULL) {
1078                 DEBUG(DEBUG_CRIT,(__location__ " Out of memory\n"));
1079                 talloc_free(v);
1080                 talloc_free(tmp_ctx);
1081                 return;         
1082         }
1083         v->r =  (struct ctdb_rec_data *)&v->recs->data[0];
1084
1085         DLIST_ADD(rec->vacuum_info, v);
1086
1087         talloc_set_destructor(v, vacuum_info_destructor);
1088
1089         vacuum_fetch_next(v);
1090         talloc_free(tmp_ctx);
1091 }
1092
1093
1094 /*
1095  * handler for database detach
1096  */
1097 static void detach_database_handler(struct ctdb_context *ctdb, uint64_t srvid,
1098                                     TDB_DATA data, void *private_data)
1099 {
1100         struct ctdb_recoverd *rec = talloc_get_type(private_data,
1101                                                     struct ctdb_recoverd);
1102         uint32_t db_id;
1103         struct vacuum_info *v, *vnext;
1104         struct ctdb_db_context *ctdb_db;
1105
1106         if (data.dsize != sizeof(db_id)) {
1107                 return;
1108         }
1109         db_id = *(uint32_t *)data.dptr;
1110
1111         ctdb_db = find_ctdb_db(ctdb, db_id);
1112         if (ctdb_db == NULL) {
1113                 /* database is not attached */
1114                 return;
1115         }
1116
1117         /* Stop any active vacuum fetch */
1118         v = rec->vacuum_info;
1119         while (v != NULL) {
1120                 vnext = v->next;
1121
1122                 if (v->ctdb_db->db_id == db_id) {
1123                         talloc_free(v);
1124                 }
1125                 v = vnext;
1126         }
1127
1128         DLIST_REMOVE(ctdb->db_list, ctdb_db);
1129
1130         DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
1131                              ctdb_db->db_name));
1132         talloc_free(ctdb_db);
1133 }
1134
1135 /*
1136   called when ctdb_wait_timeout should finish
1137  */
1138 static void ctdb_wait_handler(struct event_context *ev, struct timed_event *te, 
1139                               struct timeval yt, void *p)
1140 {
1141         uint32_t *timed_out = (uint32_t *)p;
1142         (*timed_out) = 1;
1143 }
1144
1145 /*
1146   wait for a given number of seconds
1147  */
1148 static void ctdb_wait_timeout(struct ctdb_context *ctdb, double secs)
1149 {
1150         uint32_t timed_out = 0;
1151         time_t usecs = (secs - (time_t)secs) * 1000000;
1152         event_add_timed(ctdb->ev, ctdb, timeval_current_ofs(secs, usecs), ctdb_wait_handler, &timed_out);
1153         while (!timed_out) {
1154                 event_loop_once(ctdb->ev);
1155         }
1156 }
1157
1158 /*
1159   called when an election times out (ends)
1160  */
1161 static void ctdb_election_timeout(struct event_context *ev, struct timed_event *te, 
1162                                   struct timeval t, void *p)
1163 {
1164         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
1165         rec->election_timeout = NULL;
1166         fast_start = false;
1167
1168         DEBUG(DEBUG_WARNING,("Election period ended\n"));
1169 }
1170
1171
1172 /*
1173   wait for an election to finish. It finished election_timeout seconds after
1174   the last election packet is received
1175  */
1176 static void ctdb_wait_election(struct ctdb_recoverd *rec)
1177 {
1178         struct ctdb_context *ctdb = rec->ctdb;
1179         while (rec->election_timeout) {
1180                 event_loop_once(ctdb->ev);
1181         }
1182 }
1183
1184 /*
1185   Update our local flags from all remote connected nodes. 
1186   This is only run when we are or we belive we are the recovery master
1187  */
1188 static int update_local_flags(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap)
1189 {
1190         int j;
1191         struct ctdb_context *ctdb = rec->ctdb;
1192         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
1193
1194         /* get the nodemap for all active remote nodes and verify
1195            they are the same as for this node
1196          */
1197         for (j=0; j<nodemap->num; j++) {
1198                 struct ctdb_node_map *remote_nodemap=NULL;
1199                 int ret;
1200
1201                 if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
1202                         continue;
1203                 }
1204                 if (nodemap->nodes[j].pnn == ctdb->pnn) {
1205                         continue;
1206                 }
1207
1208                 ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
1209                                            mem_ctx, &remote_nodemap);
1210                 if (ret != 0) {
1211                         DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from remote node %u\n", 
1212                                   nodemap->nodes[j].pnn));
1213                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
1214                         talloc_free(mem_ctx);
1215                         return MONITOR_FAILED;
1216                 }
1217                 if (nodemap->nodes[j].flags != remote_nodemap->nodes[j].flags) {
1218                         /* We should tell our daemon about this so it
1219                            updates its flags or else we will log the same 
1220                            message again in the next iteration of recovery.
1221                            Since we are the recovery master we can just as
1222                            well update the flags on all nodes.
1223                         */
1224                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags, ~remote_nodemap->nodes[j].flags);
1225                         if (ret != 0) {
1226                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update nodeflags on remote nodes\n"));
1227                                 return -1;
1228                         }
1229
1230                         /* Update our local copy of the flags in the recovery
1231                            daemon.
1232                         */
1233                         DEBUG(DEBUG_NOTICE,("Remote node %u had flags 0x%x, local had 0x%x - updating local\n",
1234                                  nodemap->nodes[j].pnn, remote_nodemap->nodes[j].flags,
1235                                  nodemap->nodes[j].flags));
1236                         nodemap->nodes[j].flags = remote_nodemap->nodes[j].flags;
1237                 }
1238                 talloc_free(remote_nodemap);
1239         }
1240         talloc_free(mem_ctx);
1241         return MONITOR_OK;
1242 }
1243
1244
1245 /* Create a new random generation ip. 
1246    The generation id can not be the INVALID_GENERATION id
1247 */
1248 static uint32_t new_generation(void)
1249 {
1250         uint32_t generation;
1251
1252         while (1) {
1253                 generation = random();
1254
1255                 if (generation != INVALID_GENERATION) {
1256                         break;
1257                 }
1258         }
1259
1260         return generation;
1261 }
1262
1263
1264 /*
1265   create a temporary working database
1266  */
1267 static struct tdb_wrap *create_recdb(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx)
1268 {
1269         char *name;
1270         struct tdb_wrap *recdb;
1271         unsigned tdb_flags;
1272
1273         /* open up the temporary recovery database */
1274         name = talloc_asprintf(mem_ctx, "%s/recdb.tdb.%u",
1275                                ctdb->db_directory_state,
1276                                ctdb->pnn);
1277         if (name == NULL) {
1278                 return NULL;
1279         }
1280         unlink(name);
1281
1282         tdb_flags = TDB_NOLOCK;
1283         if (ctdb->valgrinding) {
1284                 tdb_flags |= TDB_NOMMAP;
1285         }
1286         tdb_flags |= (TDB_INCOMPATIBLE_HASH | TDB_DISALLOW_NESTING);
1287
1288         recdb = tdb_wrap_open(mem_ctx, name, ctdb->tunable.database_hash_size, 
1289                               tdb_flags, O_RDWR|O_CREAT|O_EXCL, 0600);
1290         if (recdb == NULL) {
1291                 DEBUG(DEBUG_CRIT,(__location__ " Failed to create temp recovery database '%s'\n", name));
1292         }
1293
1294         talloc_free(name);
1295
1296         return recdb;
1297 }
1298
1299
1300 /* 
1301    a traverse function for pulling all relevant records from recdb
1302  */
1303 struct recdb_data {
1304         struct ctdb_context *ctdb;
1305         struct ctdb_marshall_buffer *recdata;
1306         uint32_t len;
1307         uint32_t allocated_len;
1308         bool failed;
1309         bool persistent;
1310 };
1311
1312 static int traverse_recdb(struct tdb_context *tdb, TDB_DATA key, TDB_DATA data, void *p)
1313 {
1314         struct recdb_data *params = (struct recdb_data *)p;
1315         struct ctdb_rec_data *rec;
1316         struct ctdb_ltdb_header *hdr;
1317
1318         /*
1319          * skip empty records - but NOT for persistent databases:
1320          *
1321          * The record-by-record mode of recovery deletes empty records.
1322          * For persistent databases, this can lead to data corruption
1323          * by deleting records that should be there:
1324          *
1325          * - Assume the cluster has been running for a while.
1326          *
1327          * - A record R in a persistent database has been created and
1328          *   deleted a couple of times, the last operation being deletion,
1329          *   leaving an empty record with a high RSN, say 10.
1330          *
1331          * - Now a node N is turned off.
1332          *
1333          * - This leaves the local database copy of D on N with the empty
1334          *   copy of R and RSN 10. On all other nodes, the recovery has deleted
1335          *   the copy of record R.
1336          *
1337          * - Now the record is created again while node N is turned off.
1338          *   This creates R with RSN = 1 on all nodes except for N.
1339          *
1340          * - Now node N is turned on again. The following recovery will chose
1341          *   the older empty copy of R due to RSN 10 > RSN 1.
1342          *
1343          * ==> Hence the record is gone after the recovery.
1344          *
1345          * On databases like Samba's registry, this can damage the higher-level
1346          * data structures built from the various tdb-level records.
1347          */
1348         if (!params->persistent && data.dsize <= sizeof(struct ctdb_ltdb_header)) {
1349                 return 0;
1350         }
1351
1352         /* update the dmaster field to point to us */
1353         hdr = (struct ctdb_ltdb_header *)data.dptr;
1354         if (!params->persistent) {
1355                 hdr->dmaster = params->ctdb->pnn;
1356                 hdr->flags |= CTDB_REC_FLAG_MIGRATED_WITH_DATA;
1357         }
1358
1359         /* add the record to the blob ready to send to the nodes */
1360         rec = ctdb_marshall_record(params->recdata, 0, key, NULL, data);
1361         if (rec == NULL) {
1362                 params->failed = true;
1363                 return -1;
1364         }
1365         if (params->len + rec->length >= params->allocated_len) {
1366                 params->allocated_len = rec->length + params->len + params->ctdb->tunable.pulldb_preallocation_size;
1367                 params->recdata = talloc_realloc_size(NULL, params->recdata, params->allocated_len);
1368         }
1369         if (params->recdata == NULL) {
1370                 DEBUG(DEBUG_CRIT,(__location__ " Failed to expand recdata to %u\n",
1371                          rec->length + params->len));
1372                 params->failed = true;
1373                 return -1;
1374         }
1375         params->recdata->count++;
1376         memcpy(params->len+(uint8_t *)params->recdata, rec, rec->length);
1377         params->len += rec->length;
1378         talloc_free(rec);
1379
1380         return 0;
1381 }
1382
1383 /*
1384   push the recdb database out to all nodes
1385  */
1386 static int push_recdb_database(struct ctdb_context *ctdb, uint32_t dbid,
1387                                bool persistent,
1388                                struct tdb_wrap *recdb, struct ctdb_node_map *nodemap)
1389 {
1390         struct recdb_data params;
1391         struct ctdb_marshall_buffer *recdata;
1392         TDB_DATA outdata;
1393         TALLOC_CTX *tmp_ctx;
1394         uint32_t *nodes;
1395
1396         tmp_ctx = talloc_new(ctdb);
1397         CTDB_NO_MEMORY(ctdb, tmp_ctx);
1398
1399         recdata = talloc_zero(recdb, struct ctdb_marshall_buffer);
1400         CTDB_NO_MEMORY(ctdb, recdata);
1401
1402         recdata->db_id = dbid;
1403
1404         params.ctdb = ctdb;
1405         params.recdata = recdata;
1406         params.len = offsetof(struct ctdb_marshall_buffer, data);
1407         params.allocated_len = params.len;
1408         params.failed = false;
1409         params.persistent = persistent;
1410
1411         if (tdb_traverse_read(recdb->tdb, traverse_recdb, &params) == -1) {
1412                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1413                 talloc_free(params.recdata);
1414                 talloc_free(tmp_ctx);
1415                 return -1;
1416         }
1417
1418         if (params.failed) {
1419                 DEBUG(DEBUG_ERR,(__location__ " Failed to traverse recdb database\n"));
1420                 talloc_free(params.recdata);
1421                 talloc_free(tmp_ctx);
1422                 return -1;              
1423         }
1424
1425         recdata = params.recdata;
1426
1427         outdata.dptr = (void *)recdata;
1428         outdata.dsize = params.len;
1429
1430         nodes = list_of_active_nodes(ctdb, nodemap, tmp_ctx, true);
1431         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_PUSH_DB,
1432                                         nodes, 0,
1433                                         CONTROL_TIMEOUT(), false, outdata,
1434                                         NULL, NULL,
1435                                         NULL) != 0) {
1436                 DEBUG(DEBUG_ERR,(__location__ " Failed to push recdb records to nodes for db 0x%x\n", dbid));
1437                 talloc_free(recdata);
1438                 talloc_free(tmp_ctx);
1439                 return -1;
1440         }
1441
1442         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pushed remote database 0x%x of size %u\n", 
1443                   dbid, recdata->count));
1444
1445         talloc_free(recdata);
1446         talloc_free(tmp_ctx);
1447
1448         return 0;
1449 }
1450
1451
1452 /*
1453   go through a full recovery on one database 
1454  */
1455 static int recover_database(struct ctdb_recoverd *rec, 
1456                             TALLOC_CTX *mem_ctx,
1457                             uint32_t dbid,
1458                             bool persistent,
1459                             uint32_t pnn, 
1460                             struct ctdb_node_map *nodemap,
1461                             uint32_t transaction_id)
1462 {
1463         struct tdb_wrap *recdb;
1464         int ret;
1465         struct ctdb_context *ctdb = rec->ctdb;
1466         TDB_DATA data;
1467         struct ctdb_control_wipe_database w;
1468         uint32_t *nodes;
1469
1470         recdb = create_recdb(ctdb, mem_ctx);
1471         if (recdb == NULL) {
1472                 return -1;
1473         }
1474
1475         /* pull all remote databases onto the recdb */
1476         ret = pull_remote_database(ctdb, rec, nodemap, recdb, dbid, persistent);
1477         if (ret != 0) {
1478                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull remote database 0x%x\n", dbid));
1479                 return -1;
1480         }
1481
1482         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - pulled remote database 0x%x\n", dbid));
1483
1484         /* wipe all the remote databases. This is safe as we are in a transaction */
1485         w.db_id = dbid;
1486         w.transaction_id = transaction_id;
1487
1488         data.dptr = (void *)&w;
1489         data.dsize = sizeof(w);
1490
1491         nodes = list_of_active_nodes(ctdb, nodemap, recdb, true);
1492         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_WIPE_DATABASE,
1493                                         nodes, 0,
1494                                         CONTROL_TIMEOUT(), false, data,
1495                                         NULL, NULL,
1496                                         NULL) != 0) {
1497                 DEBUG(DEBUG_ERR, (__location__ " Unable to wipe database. Recovery failed.\n"));
1498                 talloc_free(recdb);
1499                 return -1;
1500         }
1501         
1502         /* push out the correct database. This sets the dmaster and skips 
1503            the empty records */
1504         ret = push_recdb_database(ctdb, dbid, persistent, recdb, nodemap);
1505         if (ret != 0) {
1506                 talloc_free(recdb);
1507                 return -1;
1508         }
1509
1510         /* all done with this database */
1511         talloc_free(recdb);
1512
1513         return 0;
1514 }
1515
1516 static int ctdb_reload_remote_public_ips(struct ctdb_context *ctdb,
1517                                          struct ctdb_recoverd *rec,
1518                                          struct ctdb_node_map *nodemap,
1519                                          uint32_t *culprit)
1520 {
1521         int j;
1522         int ret;
1523
1524         if (ctdb->num_nodes != nodemap->num) {
1525                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) invalid param\n",
1526                                   ctdb->num_nodes, nodemap->num));
1527                 if (culprit) {
1528                         *culprit = ctdb->pnn;
1529                 }
1530                 return -1;
1531         }
1532
1533         for (j=0; j<nodemap->num; j++) {
1534                 /* For readability */
1535                 struct ctdb_node *node = ctdb->nodes[j];
1536
1537                 /* release any existing data */
1538                 if (node->known_public_ips) {
1539                         talloc_free(node->known_public_ips);
1540                         node->known_public_ips = NULL;
1541                 }
1542                 if (node->available_public_ips) {
1543                         talloc_free(node->available_public_ips);
1544                         node->available_public_ips = NULL;
1545                 }
1546
1547                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
1548                         continue;
1549                 }
1550
1551                 /* Retrieve the list of known public IPs from the node */
1552                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1553                                         CONTROL_TIMEOUT(),
1554                                         node->pnn,
1555                                         ctdb->nodes,
1556                                         0,
1557                                         &node->known_public_ips);
1558                 if (ret != 0) {
1559                         DEBUG(DEBUG_ERR,
1560                               ("Failed to read known public IPs from node: %u\n",
1561                                node->pnn));
1562                         if (culprit) {
1563                                 *culprit = node->pnn;
1564                         }
1565                         return -1;
1566                 }
1567
1568                 if (ctdb->do_checkpublicip &&
1569                     rec->takeover_runs_disable_ctx == NULL &&
1570                     verify_remote_ip_allocation(ctdb,
1571                                                  node->known_public_ips,
1572                                                  node->pnn)) {
1573                         DEBUG(DEBUG_ERR,("Trigger IP reallocation\n"));
1574                         rec->need_takeover_run = true;
1575                 }
1576
1577                 /* Retrieve the list of available public IPs from the node */
1578                 ret = ctdb_ctrl_get_public_ips_flags(ctdb,
1579                                         CONTROL_TIMEOUT(),
1580                                         node->pnn,
1581                                         ctdb->nodes,
1582                                         CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE,
1583                                         &node->available_public_ips);
1584                 if (ret != 0) {
1585                         DEBUG(DEBUG_ERR,
1586                               ("Failed to read available public IPs from node: %u\n",
1587                                node->pnn));
1588                         if (culprit) {
1589                                 *culprit = node->pnn;
1590                         }
1591                         return -1;
1592                 }
1593         }
1594
1595         return 0;
1596 }
1597
1598 /* when we start a recovery, make sure all nodes use the same reclock file
1599    setting
1600 */
1601 static int sync_recovery_lock_file_across_cluster(struct ctdb_recoverd *rec)
1602 {
1603         struct ctdb_context *ctdb = rec->ctdb;
1604         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
1605         TDB_DATA data;
1606         uint32_t *nodes;
1607
1608         if (ctdb->recovery_lock_file == NULL) {
1609                 data.dptr  = NULL;
1610                 data.dsize = 0;
1611         } else {
1612                 data.dsize = strlen(ctdb->recovery_lock_file) + 1;
1613                 data.dptr  = (uint8_t *)ctdb->recovery_lock_file;
1614         }
1615
1616         nodes = list_of_active_nodes(ctdb, rec->nodemap, tmp_ctx, true);
1617         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_SET_RECLOCK_FILE,
1618                                         nodes, 0,
1619                                         CONTROL_TIMEOUT(),
1620                                         false, data,
1621                                         NULL, NULL,
1622                                         rec) != 0) {
1623                 DEBUG(DEBUG_ERR, (__location__ " Failed to sync reclock file settings\n"));
1624                 talloc_free(tmp_ctx);
1625                 return -1;
1626         }
1627
1628         talloc_free(tmp_ctx);
1629         return 0;
1630 }
1631
1632
1633 /*
1634  * this callback is called for every node that failed to execute ctdb_takeover_run()
1635  * and set flag to re-run takeover run.
1636  */
1637 static void takeover_fail_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
1638 {
1639         DEBUG(DEBUG_ERR, ("Node %u failed the takeover run\n", node_pnn));
1640
1641         if (callback_data != NULL) {
1642                 struct ctdb_recoverd *rec = talloc_get_type(callback_data, struct ctdb_recoverd);
1643
1644                 DEBUG(DEBUG_ERR, ("Setting node %u as recovery fail culprit\n", node_pnn));
1645
1646                 ctdb_set_culprit(rec, node_pnn);
1647         }
1648 }
1649
1650
1651 static void ban_misbehaving_nodes(struct ctdb_recoverd *rec, bool *self_ban)
1652 {
1653         struct ctdb_context *ctdb = rec->ctdb;
1654         int i;
1655         struct ctdb_banning_state *ban_state;
1656
1657         *self_ban = false;
1658         for (i=0; i<ctdb->num_nodes; i++) {
1659                 if (ctdb->nodes[i]->ban_state == NULL) {
1660                         continue;
1661                 }
1662                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[i]->ban_state;
1663                 if (ban_state->count < 2*ctdb->num_nodes) {
1664                         continue;
1665                 }
1666
1667                 DEBUG(DEBUG_NOTICE,("Node %u reached %u banning credits - banning it for %u seconds\n",
1668                         ctdb->nodes[i]->pnn, ban_state->count,
1669                         ctdb->tunable.recovery_ban_period));
1670                 ctdb_ban_node(rec, ctdb->nodes[i]->pnn, ctdb->tunable.recovery_ban_period);
1671                 ban_state->count = 0;
1672
1673                 /* Banning ourself? */
1674                 if (ctdb->nodes[i]->pnn == rec->ctdb->pnn) {
1675                         *self_ban = true;
1676                 }
1677         }
1678 }
1679
1680 static bool do_takeover_run(struct ctdb_recoverd *rec,
1681                             struct ctdb_node_map *nodemap,
1682                             bool banning_credits_on_fail)
1683 {
1684         uint32_t *nodes = NULL;
1685         struct srvid_request_data dtr;
1686         TDB_DATA data;
1687         int i;
1688         uint32_t *rebalance_nodes = rec->force_rebalance_nodes;
1689         int ret;
1690         bool ok;
1691
1692         DEBUG(DEBUG_NOTICE, ("Takeover run starting\n"));
1693
1694         if (rec->takeover_run_in_progress) {
1695                 DEBUG(DEBUG_ERR, (__location__
1696                                   " takeover run already in progress \n"));
1697                 ok = false;
1698                 goto done;
1699         }
1700
1701         rec->takeover_run_in_progress = true;
1702
1703         /* If takeover runs are in disabled then fail... */
1704         if (rec->takeover_runs_disable_ctx != NULL) {
1705                 DEBUG(DEBUG_ERR,
1706                       ("Takeover runs are disabled so refusing to run one\n"));
1707                 ok = false;
1708                 goto done;
1709         }
1710
1711         /* Disable IP checks (takeover runs, really) on other nodes
1712          * while doing this takeover run.  This will stop those other
1713          * nodes from triggering takeover runs when think they should
1714          * be hosting an IP but it isn't yet on an interface.  Don't
1715          * wait for replies since a failure here might cause some
1716          * noise in the logs but will not actually cause a problem.
1717          */
1718         dtr.srvid = 0; /* No reply */
1719         dtr.pnn = -1;
1720
1721         data.dptr  = (uint8_t*)&dtr;
1722         data.dsize = sizeof(dtr);
1723
1724         nodes = list_of_connected_nodes(rec->ctdb, nodemap, rec, false);
1725
1726         /* Disable for 60 seconds.  This can be a tunable later if
1727          * necessary.
1728          */
1729         dtr.data = 60;
1730         for (i = 0; i < talloc_array_length(nodes); i++) {
1731                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1732                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1733                                              data) != 0) {
1734                         DEBUG(DEBUG_INFO,("Failed to disable takeover runs\n"));
1735                 }
1736         }
1737
1738         ret = ctdb_takeover_run(rec->ctdb, nodemap,
1739                                 rec->force_rebalance_nodes,
1740                                 takeover_fail_callback,
1741                                 banning_credits_on_fail ? rec : NULL);
1742
1743         /* Reenable takeover runs and IP checks on other nodes */
1744         dtr.data = 0;
1745         for (i = 0; i < talloc_array_length(nodes); i++) {
1746                 if (ctdb_client_send_message(rec->ctdb, nodes[i],
1747                                              CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
1748                                              data) != 0) {
1749                         DEBUG(DEBUG_INFO,("Failed to reenable takeover runs\n"));
1750                 }
1751         }
1752
1753         if (ret != 0) {
1754                 DEBUG(DEBUG_ERR, ("ctdb_takeover_run() failed\n"));
1755                 ok = false;
1756                 goto done;
1757         }
1758
1759         ok = true;
1760         /* Takeover run was successful so clear force rebalance targets */
1761         if (rebalance_nodes == rec->force_rebalance_nodes) {
1762                 TALLOC_FREE(rec->force_rebalance_nodes);
1763         } else {
1764                 DEBUG(DEBUG_WARNING,
1765                       ("Rebalance target nodes changed during takeover run - not clearing\n"));
1766         }
1767 done:
1768         rec->need_takeover_run = !ok;
1769         talloc_free(nodes);
1770         rec->takeover_run_in_progress = false;
1771
1772         DEBUG(DEBUG_NOTICE, ("Takeover run %s\n", ok ? "completed successfully" : "unsuccessful"));
1773         return ok;
1774 }
1775
1776
1777 /*
1778   we are the recmaster, and recovery is needed - start a recovery run
1779  */
1780 static int do_recovery(struct ctdb_recoverd *rec, 
1781                        TALLOC_CTX *mem_ctx, uint32_t pnn,
1782                        struct ctdb_node_map *nodemap, struct ctdb_vnn_map *vnnmap)
1783 {
1784         struct ctdb_context *ctdb = rec->ctdb;
1785         int i, j, ret;
1786         uint32_t generation;
1787         struct ctdb_dbid_map *dbmap;
1788         TDB_DATA data;
1789         uint32_t *nodes;
1790         struct timeval start_time;
1791         uint32_t culprit = (uint32_t)-1;
1792         bool self_ban;
1793
1794         DEBUG(DEBUG_NOTICE, (__location__ " Starting do_recovery\n"));
1795
1796         /* if recovery fails, force it again */
1797         rec->need_recovery = true;
1798
1799         if (rec->election_timeout) {
1800                 /* an election is in progress */
1801                 DEBUG(DEBUG_ERR, ("do_recovery called while election in progress - try again later\n"));
1802                 return -1;
1803         }
1804
1805         ban_misbehaving_nodes(rec, &self_ban);
1806         if (self_ban) {
1807                 DEBUG(DEBUG_NOTICE, ("This node was banned, aborting recovery\n"));
1808                 return -1;
1809         }
1810
1811         if (ctdb->recovery_lock_file != NULL) {
1812                 if (ctdb_recovery_have_lock(ctdb)) {
1813                         DEBUG(DEBUG_NOTICE, ("Already holding recovery lock\n"));
1814                 } else {
1815                         start_time = timeval_current();
1816                         DEBUG(DEBUG_NOTICE, ("Attempting to take recovery lock (%s)\n",
1817                                              ctdb->recovery_lock_file));
1818                         if (!ctdb_recovery_lock(ctdb)) {
1819                                 if (ctdb->runstate == CTDB_RUNSTATE_FIRST_RECOVERY) {
1820                                         /* If ctdb is trying first recovery, it's
1821                                          * possible that current node does not know
1822                                          * yet who the recmaster is.
1823                                          */
1824                                         DEBUG(DEBUG_ERR, ("Unable to get recovery lock"
1825                                                           " - retrying recovery\n"));
1826                                         return -1;
1827                                 }
1828
1829                                 DEBUG(DEBUG_ERR,("Unable to get recovery lock - aborting recovery "
1830                                                  "and ban ourself for %u seconds\n",
1831                                                  ctdb->tunable.recovery_ban_period));
1832                                 ctdb_ban_node(rec, pnn, ctdb->tunable.recovery_ban_period);
1833                                 return -1;
1834                         }
1835                         ctdb_ctrl_report_recd_lock_latency(ctdb,
1836                                                            CONTROL_TIMEOUT(),
1837                                                            timeval_elapsed(&start_time));
1838                         DEBUG(DEBUG_NOTICE,
1839                               ("Recovery lock taken successfully by recovery daemon\n"));
1840                 }
1841         }
1842
1843         DEBUG(DEBUG_NOTICE, (__location__ " Recovery initiated due to problem with node %u\n", rec->last_culprit_node));
1844
1845         /* get a list of all databases */
1846         ret = ctdb_ctrl_getdbmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &dbmap);
1847         if (ret != 0) {
1848                 DEBUG(DEBUG_ERR, (__location__ " Unable to get dbids from node :%u\n", pnn));
1849                 return -1;
1850         }
1851
1852         /* we do the db creation before we set the recovery mode, so the freeze happens
1853            on all databases we will be dealing with. */
1854
1855         /* verify that we have all the databases any other node has */
1856         ret = create_missing_local_databases(ctdb, nodemap, pnn, &dbmap, mem_ctx);
1857         if (ret != 0) {
1858                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing local databases\n"));
1859                 return -1;
1860         }
1861
1862         /* verify that all other nodes have all our databases */
1863         ret = create_missing_remote_databases(ctdb, nodemap, pnn, dbmap, mem_ctx);
1864         if (ret != 0) {
1865                 DEBUG(DEBUG_ERR, (__location__ " Unable to create missing remote databases\n"));
1866                 return -1;
1867         }
1868         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - created remote databases\n"));
1869
1870         /* update the database priority for all remote databases */
1871         ret = update_db_priority_on_remote_nodes(ctdb, nodemap, pnn, dbmap, mem_ctx);
1872         if (ret != 0) {
1873                 DEBUG(DEBUG_ERR, (__location__ " Unable to set db priority on remote nodes\n"));
1874         }
1875         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated db priority for all databases\n"));
1876
1877
1878         /* update all other nodes to use the same setting for reclock files
1879            as the local recovery master.
1880         */
1881         sync_recovery_lock_file_across_cluster(rec);
1882
1883         /* set recovery mode to active on all nodes */
1884         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
1885         if (ret != 0) {
1886                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
1887                 return -1;
1888         }
1889
1890         /* execute the "startrecovery" event script on all nodes */
1891         ret = run_startrecovery_eventscript(rec, nodemap);
1892         if (ret!=0) {
1893                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
1894                 return -1;
1895         }
1896
1897         /*
1898           update all nodes to have the same flags that we have
1899          */
1900         for (i=0;i<nodemap->num;i++) {
1901                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
1902                         continue;
1903                 }
1904
1905                 ret = update_flags_on_all_nodes(ctdb, nodemap, i, nodemap->nodes[i].flags);
1906                 if (ret != 0) {
1907                         if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
1908                                 DEBUG(DEBUG_WARNING, (__location__ "Unable to update flags on inactive node %d\n", i));
1909                         } else {
1910                                 DEBUG(DEBUG_ERR, (__location__ " Unable to update flags on all nodes for node %d\n", i));
1911                                 return -1;
1912                         }
1913                 }
1914         }
1915
1916         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated flags\n"));
1917
1918         /* pick a new generation number */
1919         generation = new_generation();
1920
1921         /* change the vnnmap on this node to use the new generation 
1922            number but not on any other nodes.
1923            this guarantees that if we abort the recovery prematurely
1924            for some reason (a node stops responding?)
1925            that we can just return immediately and we will reenter
1926            recovery shortly again.
1927            I.e. we deliberately leave the cluster with an inconsistent
1928            generation id to allow us to abort recovery at any stage and
1929            just restart it from scratch.
1930          */
1931         vnnmap->generation = generation;
1932         ret = ctdb_ctrl_setvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, vnnmap);
1933         if (ret != 0) {
1934                 DEBUG(DEBUG_ERR, (__location__ " Unable to set vnnmap for node %u\n", pnn));
1935                 return -1;
1936         }
1937
1938         data.dptr = (void *)&generation;
1939         data.dsize = sizeof(uint32_t);
1940
1941         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
1942         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_START,
1943                                         nodes, 0,
1944                                         CONTROL_TIMEOUT(), false, data,
1945                                         NULL,
1946                                         transaction_start_fail_callback,
1947                                         rec) != 0) {
1948                 DEBUG(DEBUG_ERR, (__location__ " Unable to start transactions. Recovery failed.\n"));
1949                 if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_CANCEL,
1950                                         nodes, 0,
1951                                         CONTROL_TIMEOUT(), false, tdb_null,
1952                                         NULL,
1953                                         NULL,
1954                                         NULL) != 0) {
1955                         DEBUG(DEBUG_ERR,("Failed to cancel recovery transaction\n"));
1956                 }
1957                 return -1;
1958         }
1959
1960         DEBUG(DEBUG_NOTICE,(__location__ " started transactions on all nodes\n"));
1961
1962         for (i=0;i<dbmap->num;i++) {
1963                 ret = recover_database(rec, mem_ctx,
1964                                        dbmap->dbs[i].dbid,
1965                                        dbmap->dbs[i].flags & CTDB_DB_FLAGS_PERSISTENT,
1966                                        pnn, nodemap, generation);
1967                 if (ret != 0) {
1968                         DEBUG(DEBUG_ERR, (__location__ " Failed to recover database 0x%x\n", dbmap->dbs[i].dbid));
1969                         return -1;
1970                 }
1971         }
1972
1973         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - starting database commits\n"));
1974
1975         /* commit all the changes */
1976         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_TRANSACTION_COMMIT,
1977                                         nodes, 0,
1978                                         CONTROL_TIMEOUT(), false, data,
1979                                         NULL, NULL,
1980                                         NULL) != 0) {
1981                 DEBUG(DEBUG_ERR, (__location__ " Unable to commit recovery changes. Recovery failed.\n"));
1982                 return -1;
1983         }
1984
1985         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - committed databases\n"));
1986         
1987
1988         /* update the capabilities for all nodes */
1989         ret = update_capabilities(ctdb, nodemap);
1990         if (ret!=0) {
1991                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
1992                 return -1;
1993         }
1994
1995         /* build a new vnn map with all the currently active and
1996            unbanned nodes */
1997         generation = new_generation();
1998         vnnmap = talloc(mem_ctx, struct ctdb_vnn_map);
1999         CTDB_NO_MEMORY(ctdb, vnnmap);
2000         vnnmap->generation = generation;
2001         vnnmap->size = 0;
2002         vnnmap->map = talloc_zero_array(vnnmap, uint32_t, vnnmap->size);
2003         CTDB_NO_MEMORY(ctdb, vnnmap->map);
2004         for (i=j=0;i<nodemap->num;i++) {
2005                 if (nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE) {
2006                         continue;
2007                 }
2008                 if (!(ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER)) {
2009                         /* this node can not be an lmaster */
2010                         DEBUG(DEBUG_DEBUG, ("Node %d cant be a LMASTER, skipping it\n", i));
2011                         continue;
2012                 }
2013
2014                 vnnmap->size++;
2015                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2016                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2017                 vnnmap->map[j++] = nodemap->nodes[i].pnn;
2018
2019         }
2020         if (vnnmap->size == 0) {
2021                 DEBUG(DEBUG_NOTICE, ("No suitable lmasters found. Adding local node (recmaster) anyway.\n"));
2022                 vnnmap->size++;
2023                 vnnmap->map = talloc_realloc(vnnmap, vnnmap->map, uint32_t, vnnmap->size);
2024                 CTDB_NO_MEMORY(ctdb, vnnmap->map);
2025                 vnnmap->map[0] = pnn;
2026         }       
2027
2028         /* update to the new vnnmap on all nodes */
2029         ret = update_vnnmap_on_all_nodes(ctdb, nodemap, pnn, vnnmap, mem_ctx);
2030         if (ret != 0) {
2031                 DEBUG(DEBUG_ERR, (__location__ " Unable to update vnnmap on all nodes\n"));
2032                 return -1;
2033         }
2034
2035         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated vnnmap\n"));
2036
2037         /* update recmaster to point to us for all nodes */
2038         ret = set_recovery_master(ctdb, nodemap, pnn);
2039         if (ret!=0) {
2040                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery master\n"));
2041                 return -1;
2042         }
2043
2044         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - updated recmaster\n"));
2045
2046         /* disable recovery mode */
2047         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_NORMAL);
2048         if (ret != 0) {
2049                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to normal on cluster\n"));
2050                 return -1;
2051         }
2052
2053         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - disabled recovery mode\n"));
2054
2055         /* Fetch known/available public IPs from each active node */
2056         ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
2057         if (ret != 0) {
2058                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2059                                  culprit));
2060                 rec->need_takeover_run = true;
2061                 return -1;
2062         }
2063
2064         do_takeover_run(rec, nodemap, false);
2065
2066         /* execute the "recovered" event script on all nodes */
2067         ret = run_recovered_eventscript(rec, nodemap, "do_recovery");
2068         if (ret!=0) {
2069                 DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Recovery process failed.\n"));
2070                 return -1;
2071         }
2072
2073         DEBUG(DEBUG_NOTICE, (__location__ " Recovery - finished the recovered event\n"));
2074
2075         /* send a message to all clients telling them that the cluster 
2076            has been reconfigured */
2077         ret = ctdb_client_send_message(ctdb, CTDB_BROADCAST_CONNECTED,
2078                                        CTDB_SRVID_RECONFIGURE, tdb_null);
2079         if (ret != 0) {
2080                 DEBUG(DEBUG_ERR, (__location__ " Failed to send reconfigure message\n"));
2081                 return -1;
2082         }
2083
2084         DEBUG(DEBUG_NOTICE, (__location__ " Recovery complete\n"));
2085
2086         rec->need_recovery = false;
2087
2088         /* we managed to complete a full recovery, make sure to forgive
2089            any past sins by the nodes that could now participate in the
2090            recovery.
2091         */
2092         DEBUG(DEBUG_ERR,("Resetting ban count to 0 for all nodes\n"));
2093         for (i=0;i<nodemap->num;i++) {
2094                 struct ctdb_banning_state *ban_state;
2095
2096                 if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
2097                         continue;
2098                 }
2099
2100                 ban_state = (struct ctdb_banning_state *)ctdb->nodes[nodemap->nodes[i].pnn]->ban_state;
2101                 if (ban_state == NULL) {
2102                         continue;
2103                 }
2104
2105                 ban_state->count = 0;
2106         }
2107
2108
2109         /* We just finished a recovery successfully. 
2110            We now wait for rerecovery_timeout before we allow 
2111            another recovery to take place.
2112         */
2113         DEBUG(DEBUG_NOTICE, ("Just finished a recovery. New recoveries will now be supressed for the rerecovery timeout (%d seconds)\n", ctdb->tunable.rerecovery_timeout));
2114         ctdb_wait_timeout(ctdb, ctdb->tunable.rerecovery_timeout);
2115         DEBUG(DEBUG_NOTICE, ("The rerecovery timeout has elapsed. We now allow recoveries to trigger again.\n"));
2116
2117         return 0;
2118 }
2119
2120
2121 /*
2122   elections are won by first checking the number of connected nodes, then
2123   the priority time, then the pnn
2124  */
2125 struct election_message {
2126         uint32_t num_connected;
2127         struct timeval priority_time;
2128         uint32_t pnn;
2129         uint32_t node_flags;
2130 };
2131
2132 /*
2133   form this nodes election data
2134  */
2135 static void ctdb_election_data(struct ctdb_recoverd *rec, struct election_message *em)
2136 {
2137         int ret, i;
2138         struct ctdb_node_map *nodemap;
2139         struct ctdb_context *ctdb = rec->ctdb;
2140
2141         ZERO_STRUCTP(em);
2142
2143         em->pnn = rec->ctdb->pnn;
2144         em->priority_time = rec->priority_time;
2145
2146         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, rec, &nodemap);
2147         if (ret != 0) {
2148                 DEBUG(DEBUG_ERR,(__location__ " unable to get node map\n"));
2149                 return;
2150         }
2151
2152         rec->node_flags = nodemap->nodes[ctdb->pnn].flags;
2153         em->node_flags = rec->node_flags;
2154
2155         for (i=0;i<nodemap->num;i++) {
2156                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
2157                         em->num_connected++;
2158                 }
2159         }
2160
2161         /* we shouldnt try to win this election if we cant be a recmaster */
2162         if ((ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2163                 em->num_connected = 0;
2164                 em->priority_time = timeval_current();
2165         }
2166
2167         talloc_free(nodemap);
2168 }
2169
2170 /*
2171   see if the given election data wins
2172  */
2173 static bool ctdb_election_win(struct ctdb_recoverd *rec, struct election_message *em)
2174 {
2175         struct election_message myem;
2176         int cmp = 0;
2177
2178         ctdb_election_data(rec, &myem);
2179
2180         /* we cant win if we dont have the recmaster capability */
2181         if ((rec->ctdb->capabilities & CTDB_CAP_RECMASTER) == 0) {
2182                 return false;
2183         }
2184
2185         /* we cant win if we are banned */
2186         if (rec->node_flags & NODE_FLAGS_BANNED) {
2187                 return false;
2188         }
2189
2190         /* we cant win if we are stopped */
2191         if (rec->node_flags & NODE_FLAGS_STOPPED) {
2192                 return false;
2193         }
2194
2195         /* we will automatically win if the other node is banned */
2196         if (em->node_flags & NODE_FLAGS_BANNED) {
2197                 return true;
2198         }
2199
2200         /* we will automatically win if the other node is banned */
2201         if (em->node_flags & NODE_FLAGS_STOPPED) {
2202                 return true;
2203         }
2204
2205         /* try to use the most connected node */
2206         if (cmp == 0) {
2207                 cmp = (int)myem.num_connected - (int)em->num_connected;
2208         }
2209
2210         /* then the longest running node */
2211         if (cmp == 0) {
2212                 cmp = timeval_compare(&em->priority_time, &myem.priority_time);
2213         }
2214
2215         if (cmp == 0) {
2216                 cmp = (int)myem.pnn - (int)em->pnn;
2217         }
2218
2219         return cmp > 0;
2220 }
2221
2222 /*
2223   send out an election request
2224  */
2225 static int send_election_request(struct ctdb_recoverd *rec, uint32_t pnn)
2226 {
2227         int ret;
2228         TDB_DATA election_data;
2229         struct election_message emsg;
2230         uint64_t srvid;
2231         struct ctdb_context *ctdb = rec->ctdb;
2232
2233         srvid = CTDB_SRVID_RECOVERY;
2234
2235         ctdb_election_data(rec, &emsg);
2236
2237         election_data.dsize = sizeof(struct election_message);
2238         election_data.dptr  = (unsigned char *)&emsg;
2239
2240
2241         /* first we assume we will win the election and set 
2242            recoverymaster to be ourself on the current node
2243          */
2244         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), pnn, pnn);
2245         if (ret != 0) {
2246                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request\n"));
2247                 return -1;
2248         }
2249
2250
2251         /* send an election message to all active nodes */
2252         DEBUG(DEBUG_INFO,(__location__ " Send election request to all active nodes\n"));
2253         return ctdb_client_send_message(ctdb, CTDB_BROADCAST_ALL, srvid, election_data);
2254 }
2255
2256 /*
2257   this function will unban all nodes in the cluster
2258 */
2259 static void unban_all_nodes(struct ctdb_context *ctdb)
2260 {
2261         int ret, i;
2262         struct ctdb_node_map *nodemap;
2263         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2264         
2265         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2266         if (ret != 0) {
2267                 DEBUG(DEBUG_ERR,(__location__ " failed to get nodemap to unban all nodes\n"));
2268                 return;
2269         }
2270
2271         for (i=0;i<nodemap->num;i++) {
2272                 if ( (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED))
2273                   && (nodemap->nodes[i].flags & NODE_FLAGS_BANNED) ) {
2274                         ret = ctdb_ctrl_modflags(ctdb, CONTROL_TIMEOUT(),
2275                                                  nodemap->nodes[i].pnn, 0,
2276                                                  NODE_FLAGS_BANNED);
2277                         if (ret != 0) {
2278                                 DEBUG(DEBUG_ERR, (__location__ " failed to reset ban state\n"));
2279                         }
2280                 }
2281         }
2282
2283         talloc_free(tmp_ctx);
2284 }
2285
2286
2287 /*
2288   we think we are winning the election - send a broadcast election request
2289  */
2290 static void election_send_request(struct event_context *ev, struct timed_event *te, struct timeval t, void *p)
2291 {
2292         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2293         int ret;
2294
2295         ret = send_election_request(rec, ctdb_get_pnn(rec->ctdb));
2296         if (ret != 0) {
2297                 DEBUG(DEBUG_ERR,("Failed to send election request!\n"));
2298         }
2299
2300         talloc_free(rec->send_election_te);
2301         rec->send_election_te = NULL;
2302 }
2303
2304 /*
2305   handler for memory dumps
2306 */
2307 static void mem_dump_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2308                              TDB_DATA data, void *private_data)
2309 {
2310         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2311         TDB_DATA *dump;
2312         int ret;
2313         struct srvid_request *rd;
2314
2315         if (data.dsize != sizeof(struct srvid_request)) {
2316                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2317                 talloc_free(tmp_ctx);
2318                 return;
2319         }
2320         rd = (struct srvid_request *)data.dptr;
2321
2322         dump = talloc_zero(tmp_ctx, TDB_DATA);
2323         if (dump == NULL) {
2324                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate memory for memdump\n"));
2325                 talloc_free(tmp_ctx);
2326                 return;
2327         }
2328         ret = ctdb_dump_memory(ctdb, dump);
2329         if (ret != 0) {
2330                 DEBUG(DEBUG_ERR, (__location__ " ctdb_dump_memory() failed\n"));
2331                 talloc_free(tmp_ctx);
2332                 return;
2333         }
2334
2335 DEBUG(DEBUG_ERR, ("recovery master memory dump\n"));            
2336
2337         ret = ctdb_client_send_message(ctdb, rd->pnn, rd->srvid, *dump);
2338         if (ret != 0) {
2339                 DEBUG(DEBUG_ERR,("Failed to send rd memdump reply message\n"));
2340                 talloc_free(tmp_ctx);
2341                 return;
2342         }
2343
2344         talloc_free(tmp_ctx);
2345 }
2346
2347 /*
2348   handler for getlog
2349 */
2350 static void getlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2351                            TDB_DATA data, void *private_data)
2352 {
2353         struct ctdb_get_log_addr *log_addr;
2354         pid_t child;
2355
2356         if (data.dsize != sizeof(struct ctdb_get_log_addr)) {
2357                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2358                 return;
2359         }
2360         log_addr = (struct ctdb_get_log_addr *)data.dptr;
2361
2362         child = ctdb_fork_no_free_ringbuffer(ctdb);
2363         if (child == (pid_t)-1) {
2364                 DEBUG(DEBUG_ERR,("Failed to fork a log collector child\n"));
2365                 return;
2366         }
2367
2368         if (child == 0) {
2369                 ctdb_set_process_name("ctdb_rec_log_collector");
2370                 if (switch_from_server_to_client(ctdb, "recoverd-log-collector") != 0) {
2371                         DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch log collector child into client mode.\n"));
2372                         _exit(1);
2373                 }
2374                 ctdb_collect_log(ctdb, log_addr);
2375                 _exit(0);
2376         }
2377 }
2378
2379 /*
2380   handler for clearlog
2381 */
2382 static void clearlog_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2383                              TDB_DATA data, void *private_data)
2384 {
2385         ctdb_clear_log(ctdb);
2386 }
2387
2388 /*
2389   handler for reload_nodes
2390 */
2391 static void reload_nodes_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2392                              TDB_DATA data, void *private_data)
2393 {
2394         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2395
2396         DEBUG(DEBUG_ERR, (__location__ " Reload nodes file from recovery daemon\n"));
2397
2398         ctdb_load_nodes_file(rec->ctdb);
2399 }
2400
2401
2402 static void ctdb_rebalance_timeout(struct event_context *ev,
2403                                    struct timed_event *te,
2404                                    struct timeval t, void *p)
2405 {
2406         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2407
2408         if (rec->force_rebalance_nodes == NULL) {
2409                 DEBUG(DEBUG_ERR,
2410                       ("Rebalance timeout occurred - no nodes to rebalance\n"));
2411                 return;
2412         }
2413
2414         DEBUG(DEBUG_NOTICE,
2415               ("Rebalance timeout occurred - do takeover run\n"));
2416         do_takeover_run(rec, rec->nodemap, false);
2417 }
2418
2419         
2420 static void recd_node_rebalance_handler(struct ctdb_context *ctdb,
2421                                         uint64_t srvid,
2422                                         TDB_DATA data, void *private_data)
2423 {
2424         uint32_t pnn;
2425         uint32_t *t;
2426         int len;
2427         uint32_t deferred_rebalance;
2428         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2429
2430         if (rec->recmaster != ctdb_get_pnn(ctdb)) {
2431                 return;
2432         }
2433
2434         if (data.dsize != sizeof(uint32_t)) {
2435                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of node rebalance message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(uint32_t)));
2436                 return;
2437         }
2438
2439         pnn = *(uint32_t *)&data.dptr[0];
2440
2441         DEBUG(DEBUG_NOTICE,("Setting up rebalance of IPs to node %u\n", pnn));
2442
2443         /* Copy any existing list of nodes.  There's probably some
2444          * sort of realloc variant that will do this but we need to
2445          * make sure that freeing the old array also cancels the timer
2446          * event for the timeout... not sure if realloc will do that.
2447          */
2448         len = (rec->force_rebalance_nodes != NULL) ?
2449                 talloc_array_length(rec->force_rebalance_nodes) :
2450                 0;
2451
2452         /* This allows duplicates to be added but they don't cause
2453          * harm.  A call to add a duplicate PNN arguably means that
2454          * the timeout should be reset, so this is the simplest
2455          * solution.
2456          */
2457         t = talloc_zero_array(rec, uint32_t, len+1);
2458         CTDB_NO_MEMORY_VOID(ctdb, t);
2459         if (len > 0) {
2460                 memcpy(t, rec->force_rebalance_nodes, sizeof(uint32_t) * len);
2461         }
2462         t[len] = pnn;
2463
2464         talloc_free(rec->force_rebalance_nodes);
2465
2466         rec->force_rebalance_nodes = t;
2467
2468         /* If configured, setup a deferred takeover run to make sure
2469          * that certain nodes get IPs rebalanced to them.  This will
2470          * be cancelled if a successful takeover run happens before
2471          * the timeout.  Assign tunable value to variable for
2472          * readability.
2473          */
2474         deferred_rebalance = ctdb->tunable.deferred_rebalance_on_node_add;
2475         if (deferred_rebalance != 0) {
2476                 event_add_timed(ctdb->ev, rec->force_rebalance_nodes,
2477                                 timeval_current_ofs(deferred_rebalance, 0),
2478                                 ctdb_rebalance_timeout, rec);
2479         }
2480 }
2481
2482
2483
2484 static void recd_update_ip_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2485                              TDB_DATA data, void *private_data)
2486 {
2487         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2488         struct ctdb_public_ip *ip;
2489
2490         if (rec->recmaster != rec->ctdb->pnn) {
2491                 DEBUG(DEBUG_INFO,("Not recmaster, ignore update ip message\n"));
2492                 return;
2493         }
2494
2495         if (data.dsize != sizeof(struct ctdb_public_ip)) {
2496                 DEBUG(DEBUG_ERR,(__location__ " Incorrect size of recd update ip message. Was %zd but expected %zd bytes\n", data.dsize, sizeof(struct ctdb_public_ip)));
2497                 return;
2498         }
2499
2500         ip = (struct ctdb_public_ip *)data.dptr;
2501
2502         update_ip_assignment_tree(rec->ctdb, ip);
2503 }
2504
2505
2506 static void clear_takeover_runs_disable(struct ctdb_recoverd *rec)
2507 {
2508         TALLOC_FREE(rec->takeover_runs_disable_ctx);
2509 }
2510
2511 static void reenable_takeover_runs(struct event_context *ev,
2512                                    struct timed_event *te,
2513                                    struct timeval yt, void *p)
2514 {
2515         struct ctdb_recoverd *rec = talloc_get_type(p, struct ctdb_recoverd);
2516
2517         DEBUG(DEBUG_NOTICE,("Reenabling takeover runs after timeout\n"));
2518         clear_takeover_runs_disable(rec);
2519 }
2520
2521 static void disable_takeover_runs_handler(struct ctdb_context *ctdb,
2522                                           uint64_t srvid, TDB_DATA data,
2523                                           void *private_data)
2524 {
2525         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2526                                                     struct ctdb_recoverd);
2527         struct srvid_request_data *r;
2528         uint32_t timeout;
2529         TDB_DATA result;
2530         int32_t ret = 0;
2531
2532         /* Validate input data */
2533         if (data.dsize != sizeof(struct srvid_request_data)) {
2534                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2535                                  "expecting %lu\n", (long unsigned)data.dsize,
2536                                  (long unsigned)sizeof(struct srvid_request)));
2537                 return;
2538         }
2539         if (data.dptr == NULL) {
2540                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2541                 return;
2542         }
2543
2544         r = (struct srvid_request_data *)data.dptr;
2545         timeout = r->data;
2546
2547         if (timeout == 0) {
2548                 DEBUG(DEBUG_NOTICE,("Reenabling takeover runs\n"));
2549                 clear_takeover_runs_disable(rec);
2550                 ret = ctdb_get_pnn(ctdb);
2551                 goto done;
2552         }
2553
2554         if (rec->takeover_run_in_progress) {
2555                 DEBUG(DEBUG_ERR,
2556                       ("Unable to disable takeover runs - in progress\n"));
2557                 ret = -EAGAIN;
2558                 goto done;
2559         }
2560
2561         DEBUG(DEBUG_NOTICE,("Disabling takeover runs for %u seconds\n", timeout));
2562
2563         /* Clear any old timers */
2564         clear_takeover_runs_disable(rec);
2565
2566         /* When this is non-NULL it indicates that takeover runs are
2567          * disabled.  This context also holds the timeout timer.
2568          */
2569         rec->takeover_runs_disable_ctx = talloc_new(rec);
2570         if (rec->takeover_runs_disable_ctx == NULL) {
2571                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate memory\n"));
2572                 ret = -ENOMEM;
2573                 goto done;
2574         }
2575
2576         /* Arrange for the timeout to occur */
2577         event_add_timed(ctdb->ev, rec->takeover_runs_disable_ctx,
2578                         timeval_current_ofs(timeout, 0),
2579                         reenable_takeover_runs,
2580                         rec);
2581
2582         /* Returning our PNN tells the caller that we succeeded */
2583         ret = ctdb_get_pnn(ctdb);
2584 done:
2585         result.dsize = sizeof(int32_t);
2586         result.dptr  = (uint8_t *)&ret;
2587         srvid_request_reply(ctdb, (struct srvid_request *)r, result);
2588 }
2589
2590 /* Backward compatibility for this SRVID - call
2591  * disable_takeover_runs_handler() instead
2592  */
2593 static void disable_ip_check_handler(struct ctdb_context *ctdb, uint64_t srvid,
2594                                      TDB_DATA data, void *private_data)
2595 {
2596         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2597                                                     struct ctdb_recoverd);
2598         TDB_DATA data2;
2599         struct srvid_request_data *req;
2600
2601         if (data.dsize != sizeof(uint32_t)) {
2602                 DEBUG(DEBUG_ERR,(__location__ " Wrong size for data :%lu "
2603                                  "expecting %lu\n", (long unsigned)data.dsize,
2604                                  (long unsigned)sizeof(uint32_t)));
2605                 return;
2606         }
2607         if (data.dptr == NULL) {
2608                 DEBUG(DEBUG_ERR,(__location__ " No data received\n"));
2609                 return;
2610         }
2611
2612         req = talloc(ctdb, struct srvid_request_data);
2613         CTDB_NO_MEMORY_VOID(ctdb, req);
2614
2615         req->srvid = 0; /* No reply */
2616         req->pnn = -1;
2617         req->data = *((uint32_t *)data.dptr); /* Timeout */
2618
2619         data2.dsize = sizeof(*req);
2620         data2.dptr = (uint8_t *)req;
2621
2622         disable_takeover_runs_handler(rec->ctdb,
2623                                       CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
2624                                       data2, rec);
2625 }
2626
2627 /*
2628   handler for ip reallocate, just add it to the list of requests and 
2629   handle this later in the monitor_cluster loop so we do not recurse
2630   with other requests to takeover_run()
2631 */
2632 static void ip_reallocate_handler(struct ctdb_context *ctdb, uint64_t srvid,
2633                                   TDB_DATA data, void *private_data)
2634 {
2635         struct srvid_request *request;
2636         struct ctdb_recoverd *rec = talloc_get_type(private_data,
2637                                                     struct ctdb_recoverd);
2638
2639         if (data.dsize != sizeof(struct srvid_request)) {
2640                 DEBUG(DEBUG_ERR, (__location__ " Wrong size of return address.\n"));
2641                 return;
2642         }
2643
2644         request = (struct srvid_request *)data.dptr;
2645
2646         srvid_request_add(ctdb, &rec->reallocate_requests, request);
2647 }
2648
2649 static void process_ipreallocate_requests(struct ctdb_context *ctdb,
2650                                           struct ctdb_recoverd *rec)
2651 {
2652         TDB_DATA result;
2653         int32_t ret;
2654         uint32_t culprit;
2655         struct srvid_requests *current;
2656
2657         DEBUG(DEBUG_INFO, ("recovery master forced ip reallocation\n"));
2658
2659         /* Only process requests that are currently pending.  More
2660          * might come in while the takeover run is in progress and
2661          * they will need to be processed later since they might
2662          * be in response flag changes.
2663          */
2664         current = rec->reallocate_requests;
2665         rec->reallocate_requests = NULL;
2666
2667         /* update the list of public ips that a node can handle for
2668            all connected nodes
2669         */
2670         ret = ctdb_reload_remote_public_ips(ctdb, rec, rec->nodemap, &culprit);
2671         if (ret != 0) {
2672                 DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
2673                                  culprit));
2674                 rec->need_takeover_run = true;
2675         }
2676         if (ret == 0) {
2677                 if (do_takeover_run(rec, rec->nodemap, false)) {
2678                         ret = ctdb_get_pnn(ctdb);
2679                 } else {
2680                         ret = -1;
2681                 }
2682         }
2683
2684         result.dsize = sizeof(int32_t);
2685         result.dptr  = (uint8_t *)&ret;
2686
2687         srvid_requests_reply(ctdb, &current, result);
2688 }
2689
2690
2691 /*
2692   handler for recovery master elections
2693 */
2694 static void election_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2695                              TDB_DATA data, void *private_data)
2696 {
2697         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2698         int ret;
2699         struct election_message *em = (struct election_message *)data.dptr;
2700         TALLOC_CTX *mem_ctx;
2701
2702         /* Ignore election packets from ourself */
2703         if (ctdb->pnn == em->pnn) {
2704                 return;
2705         }
2706
2707         /* we got an election packet - update the timeout for the election */
2708         talloc_free(rec->election_timeout);
2709         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2710                                                 fast_start ?
2711                                                 timeval_current_ofs(0, 500000) :
2712                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2713                                                 ctdb_election_timeout, rec);
2714
2715         mem_ctx = talloc_new(ctdb);
2716
2717         /* someone called an election. check their election data
2718            and if we disagree and we would rather be the elected node, 
2719            send a new election message to all other nodes
2720          */
2721         if (ctdb_election_win(rec, em)) {
2722                 if (!rec->send_election_te) {
2723                         rec->send_election_te = event_add_timed(ctdb->ev, rec, 
2724                                                                 timeval_current_ofs(0, 500000),
2725                                                                 election_send_request, rec);
2726                 }
2727                 talloc_free(mem_ctx);
2728                 /*unban_all_nodes(ctdb);*/
2729                 return;
2730         }
2731
2732         /* we didn't win */
2733         talloc_free(rec->send_election_te);
2734         rec->send_election_te = NULL;
2735
2736         if (ctdb->recovery_lock_file != NULL) {
2737                 /* Release the recovery lock file */
2738                 if (em->pnn != ctdb->pnn &&
2739                     ctdb_recovery_have_lock(ctdb)) {
2740                         ctdb_recovery_unlock(ctdb);
2741                         unban_all_nodes(ctdb);
2742                 }
2743         }
2744
2745         /* ok, let that guy become recmaster then */
2746         ret = ctdb_ctrl_setrecmaster(ctdb, CONTROL_TIMEOUT(), ctdb_get_pnn(ctdb), em->pnn);
2747         if (ret != 0) {
2748                 DEBUG(DEBUG_ERR, (__location__ " failed to send recmaster election request"));
2749                 talloc_free(mem_ctx);
2750                 return;
2751         }
2752
2753         talloc_free(mem_ctx);
2754         return;
2755 }
2756
2757
2758 /*
2759   force the start of the election process
2760  */
2761 static void force_election(struct ctdb_recoverd *rec, uint32_t pnn, 
2762                            struct ctdb_node_map *nodemap)
2763 {
2764         int ret;
2765         struct ctdb_context *ctdb = rec->ctdb;
2766
2767         DEBUG(DEBUG_INFO,(__location__ " Force an election\n"));
2768
2769         /* set all nodes to recovery mode to stop all internode traffic */
2770         ret = set_recovery_mode(ctdb, rec, nodemap, CTDB_RECOVERY_ACTIVE);
2771         if (ret != 0) {
2772                 DEBUG(DEBUG_ERR, (__location__ " Unable to set recovery mode to active on cluster\n"));
2773                 return;
2774         }
2775
2776         talloc_free(rec->election_timeout);
2777         rec->election_timeout = event_add_timed(ctdb->ev, ctdb, 
2778                                                 fast_start ?
2779                                                 timeval_current_ofs(0, 500000) :
2780                                                 timeval_current_ofs(ctdb->tunable.election_timeout, 0), 
2781                                                 ctdb_election_timeout, rec);
2782
2783         ret = send_election_request(rec, pnn);
2784         if (ret!=0) {
2785                 DEBUG(DEBUG_ERR, (__location__ " failed to initiate recmaster election"));
2786                 return;
2787         }
2788
2789         /* wait for a few seconds to collect all responses */
2790         ctdb_wait_election(rec);
2791 }
2792
2793
2794
2795 /*
2796   handler for when a node changes its flags
2797 */
2798 static void monitor_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2799                             TDB_DATA data, void *private_data)
2800 {
2801         int ret;
2802         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2803         struct ctdb_node_map *nodemap=NULL;
2804         TALLOC_CTX *tmp_ctx;
2805         int i;
2806         struct ctdb_recoverd *rec = talloc_get_type(private_data, struct ctdb_recoverd);
2807         int disabled_flag_changed;
2808
2809         if (data.dsize != sizeof(*c)) {
2810                 DEBUG(DEBUG_ERR,(__location__ "Invalid data in ctdb_node_flag_change\n"));
2811                 return;
2812         }
2813
2814         tmp_ctx = talloc_new(ctdb);
2815         CTDB_NO_MEMORY_VOID(ctdb, tmp_ctx);
2816
2817         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &nodemap);
2818         if (ret != 0) {
2819                 DEBUG(DEBUG_ERR,(__location__ "ctdb_ctrl_getnodemap failed in monitor_handler\n"));
2820                 talloc_free(tmp_ctx);
2821                 return;         
2822         }
2823
2824
2825         for (i=0;i<nodemap->num;i++) {
2826                 if (nodemap->nodes[i].pnn == c->pnn) break;
2827         }
2828
2829         if (i == nodemap->num) {
2830                 DEBUG(DEBUG_CRIT,(__location__ "Flag change for non-existant node %u\n", c->pnn));
2831                 talloc_free(tmp_ctx);
2832                 return;
2833         }
2834
2835         if (c->old_flags != c->new_flags) {
2836                 DEBUG(DEBUG_NOTICE,("Node %u has changed flags - now 0x%x  was 0x%x\n", c->pnn, c->new_flags, c->old_flags));
2837         }
2838
2839         disabled_flag_changed =  (nodemap->nodes[i].flags ^ c->new_flags) & NODE_FLAGS_DISABLED;
2840
2841         nodemap->nodes[i].flags = c->new_flags;
2842
2843         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2844                                      CTDB_CURRENT_NODE, &ctdb->recovery_master);
2845
2846         if (ret == 0) {
2847                 ret = ctdb_ctrl_getrecmode(ctdb, tmp_ctx, CONTROL_TIMEOUT(), 
2848                                            CTDB_CURRENT_NODE, &ctdb->recovery_mode);
2849         }
2850         
2851         if (ret == 0 &&
2852             ctdb->recovery_master == ctdb->pnn &&
2853             ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
2854                 /* Only do the takeover run if the perm disabled or unhealthy
2855                    flags changed since these will cause an ip failover but not
2856                    a recovery.
2857                    If the node became disconnected or banned this will also
2858                    lead to an ip address failover but that is handled 
2859                    during recovery
2860                 */
2861                 if (disabled_flag_changed) {
2862                         rec->need_takeover_run = true;
2863                 }
2864         }
2865
2866         talloc_free(tmp_ctx);
2867 }
2868
2869 /*
2870   handler for when we need to push out flag changes ot all other nodes
2871 */
2872 static void push_flags_handler(struct ctdb_context *ctdb, uint64_t srvid, 
2873                             TDB_DATA data, void *private_data)
2874 {
2875         int ret;
2876         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
2877         struct ctdb_node_map *nodemap=NULL;
2878         TALLOC_CTX *tmp_ctx = talloc_new(ctdb);
2879         uint32_t recmaster;
2880         uint32_t *nodes;
2881
2882         /* find the recovery master */
2883         ret = ctdb_ctrl_getrecmaster(ctdb, tmp_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &recmaster);
2884         if (ret != 0) {
2885                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from local node\n"));
2886                 talloc_free(tmp_ctx);
2887                 return;
2888         }
2889
2890         /* read the node flags from the recmaster */
2891         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), recmaster, tmp_ctx, &nodemap);
2892         if (ret != 0) {
2893                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", c->pnn));
2894                 talloc_free(tmp_ctx);
2895                 return;
2896         }
2897         if (c->pnn >= nodemap->num) {
2898                 DEBUG(DEBUG_ERR,(__location__ " Nodemap from recmaster does not contain node %d\n", c->pnn));
2899                 talloc_free(tmp_ctx);
2900                 return;
2901         }
2902
2903         /* send the flags update to all connected nodes */
2904         nodes = list_of_connected_nodes(ctdb, nodemap, tmp_ctx, true);
2905
2906         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_MODIFY_FLAGS,
2907                                       nodes, 0, CONTROL_TIMEOUT(),
2908                                       false, data,
2909                                       NULL, NULL,
2910                                       NULL) != 0) {
2911                 DEBUG(DEBUG_ERR, (__location__ " ctdb_control to modify node flags failed\n"));
2912
2913                 talloc_free(tmp_ctx);
2914                 return;
2915         }
2916
2917         talloc_free(tmp_ctx);
2918 }
2919
2920
2921 struct verify_recmode_normal_data {
2922         uint32_t count;
2923         enum monitor_result status;
2924 };
2925
2926 static void verify_recmode_normal_callback(struct ctdb_client_control_state *state)
2927 {
2928         struct verify_recmode_normal_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmode_normal_data);
2929
2930
2931         /* one more node has responded with recmode data*/
2932         rmdata->count--;
2933
2934         /* if we failed to get the recmode, then return an error and let
2935            the main loop try again.
2936         */
2937         if (state->state != CTDB_CONTROL_DONE) {
2938                 if (rmdata->status == MONITOR_OK) {
2939                         rmdata->status = MONITOR_FAILED;
2940                 }
2941                 return;
2942         }
2943
2944         /* if we got a response, then the recmode will be stored in the
2945            status field
2946         */
2947         if (state->status != CTDB_RECOVERY_NORMAL) {
2948                 DEBUG(DEBUG_NOTICE, ("Node:%u was in recovery mode. Start recovery process\n", state->c->hdr.destnode));
2949                 rmdata->status = MONITOR_RECOVERY_NEEDED;
2950         }
2951
2952         return;
2953 }
2954
2955
2956 /* verify that all nodes are in normal recovery mode */
2957 static enum monitor_result verify_recmode(struct ctdb_context *ctdb, struct ctdb_node_map *nodemap)
2958 {
2959         struct verify_recmode_normal_data *rmdata;
2960         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
2961         struct ctdb_client_control_state *state;
2962         enum monitor_result status;
2963         int j;
2964         
2965         rmdata = talloc(mem_ctx, struct verify_recmode_normal_data);
2966         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
2967         rmdata->count  = 0;
2968         rmdata->status = MONITOR_OK;
2969
2970         /* loop over all active nodes and send an async getrecmode call to 
2971            them*/
2972         for (j=0; j<nodemap->num; j++) {
2973                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
2974                         continue;
2975                 }
2976                 state = ctdb_ctrl_getrecmode_send(ctdb, mem_ctx, 
2977                                         CONTROL_TIMEOUT(), 
2978                                         nodemap->nodes[j].pnn);
2979                 if (state == NULL) {
2980                         /* we failed to send the control, treat this as 
2981                            an error and try again next iteration
2982                         */                      
2983                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmode_send during monitoring\n"));
2984                         talloc_free(mem_ctx);
2985                         return MONITOR_FAILED;
2986                 }
2987
2988                 /* set up the callback functions */
2989                 state->async.fn = verify_recmode_normal_callback;
2990                 state->async.private_data = rmdata;
2991
2992                 /* one more control to wait for to complete */
2993                 rmdata->count++;
2994         }
2995
2996
2997         /* now wait for up to the maximum number of seconds allowed
2998            or until all nodes we expect a response from has replied
2999         */
3000         while (rmdata->count > 0) {
3001                 event_loop_once(ctdb->ev);
3002         }
3003
3004         status = rmdata->status;
3005         talloc_free(mem_ctx);
3006         return status;
3007 }
3008
3009
3010 struct verify_recmaster_data {
3011         struct ctdb_recoverd *rec;
3012         uint32_t count;
3013         uint32_t pnn;
3014         enum monitor_result status;
3015 };
3016
3017 static void verify_recmaster_callback(struct ctdb_client_control_state *state)
3018 {
3019         struct verify_recmaster_data *rmdata = talloc_get_type(state->async.private_data, struct verify_recmaster_data);
3020
3021
3022         /* one more node has responded with recmaster data*/
3023         rmdata->count--;
3024
3025         /* if we failed to get the recmaster, then return an error and let
3026            the main loop try again.
3027         */
3028         if (state->state != CTDB_CONTROL_DONE) {
3029                 if (rmdata->status == MONITOR_OK) {
3030                         rmdata->status = MONITOR_FAILED;
3031                 }
3032                 return;
3033         }
3034
3035         /* if we got a response, then the recmaster will be stored in the
3036            status field
3037         */
3038         if (state->status != rmdata->pnn) {
3039                 DEBUG(DEBUG_ERR,("Node %d thinks node %d is recmaster. Need a new recmaster election\n", state->c->hdr.destnode, state->status));
3040                 ctdb_set_culprit(rmdata->rec, state->c->hdr.destnode);
3041                 rmdata->status = MONITOR_ELECTION_NEEDED;
3042         }
3043
3044         return;
3045 }
3046
3047
3048 /* verify that all nodes agree that we are the recmaster */
3049 static enum monitor_result verify_recmaster(struct ctdb_recoverd *rec, struct ctdb_node_map *nodemap, uint32_t pnn)
3050 {
3051         struct ctdb_context *ctdb = rec->ctdb;
3052         struct verify_recmaster_data *rmdata;
3053         TALLOC_CTX *mem_ctx = talloc_new(ctdb);
3054         struct ctdb_client_control_state *state;
3055         enum monitor_result status;
3056         int j;
3057         
3058         rmdata = talloc(mem_ctx, struct verify_recmaster_data);
3059         CTDB_NO_MEMORY_FATAL(ctdb, rmdata);
3060         rmdata->rec    = rec;
3061         rmdata->count  = 0;
3062         rmdata->pnn    = pnn;
3063         rmdata->status = MONITOR_OK;
3064
3065         /* loop over all active nodes and send an async getrecmaster call to 
3066            them*/
3067         for (j=0; j<nodemap->num; j++) {
3068                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3069                         continue;
3070                 }
3071                 state = ctdb_ctrl_getrecmaster_send(ctdb, mem_ctx, 
3072                                         CONTROL_TIMEOUT(),
3073                                         nodemap->nodes[j].pnn);
3074                 if (state == NULL) {
3075                         /* we failed to send the control, treat this as 
3076                            an error and try again next iteration
3077                         */                      
3078                         DEBUG(DEBUG_ERR,("Failed to call ctdb_ctrl_getrecmaster_send during monitoring\n"));
3079                         talloc_free(mem_ctx);
3080                         return MONITOR_FAILED;
3081                 }
3082
3083                 /* set up the callback functions */
3084                 state->async.fn = verify_recmaster_callback;
3085                 state->async.private_data = rmdata;
3086
3087                 /* one more control to wait for to complete */
3088                 rmdata->count++;
3089         }
3090
3091
3092         /* now wait for up to the maximum number of seconds allowed
3093            or until all nodes we expect a response from has replied
3094         */
3095         while (rmdata->count > 0) {
3096                 event_loop_once(ctdb->ev);
3097         }
3098
3099         status = rmdata->status;
3100         talloc_free(mem_ctx);
3101         return status;
3102 }
3103
3104 static bool interfaces_have_changed(struct ctdb_context *ctdb,
3105                                     struct ctdb_recoverd *rec)
3106 {
3107         struct ctdb_control_get_ifaces *ifaces = NULL;
3108         TALLOC_CTX *mem_ctx;
3109         bool ret = false;
3110
3111         mem_ctx = talloc_new(NULL);
3112
3113         /* Read the interfaces from the local node */
3114         if (ctdb_ctrl_get_ifaces(ctdb, CONTROL_TIMEOUT(),
3115                                  CTDB_CURRENT_NODE, mem_ctx, &ifaces) != 0) {
3116                 DEBUG(DEBUG_ERR, ("Unable to get interfaces from local node %u\n", ctdb->pnn));
3117                 /* We could return an error.  However, this will be
3118                  * rare so we'll decide that the interfaces have
3119                  * actually changed, just in case.
3120                  */
3121                 talloc_free(mem_ctx);
3122                 return true;
3123         }
3124
3125         if (!rec->ifaces) {
3126                 /* We haven't been here before so things have changed */
3127                 DEBUG(DEBUG_NOTICE, ("Initial interface fetched\n"));
3128                 ret = true;
3129         } else if (rec->ifaces->num != ifaces->num) {
3130                 /* Number of interfaces has changed */
3131                 DEBUG(DEBUG_NOTICE, ("Interface count changed from %d to %d\n",
3132                                      rec->ifaces->num, ifaces->num));
3133                 ret = true;
3134         } else {
3135                 /* See if interface names or link states have changed */
3136                 int i;
3137                 for (i = 0; i < rec->ifaces->num; i++) {
3138                         struct ctdb_control_iface_info * iface = &rec->ifaces->ifaces[i];
3139                         if (strcmp(iface->name, ifaces->ifaces[i].name) != 0) {
3140                                 DEBUG(DEBUG_NOTICE,
3141                                       ("Interface in slot %d changed: %s => %s\n",
3142                                        i, iface->name, ifaces->ifaces[i].name));
3143                                 ret = true;
3144                                 break;
3145                         }
3146                         if (iface->link_state != ifaces->ifaces[i].link_state) {
3147                                 DEBUG(DEBUG_NOTICE,
3148                                       ("Interface %s changed state: %d => %d\n",
3149                                        iface->name, iface->link_state,
3150                                        ifaces->ifaces[i].link_state));
3151                                 ret = true;
3152                                 break;
3153                         }
3154                 }
3155         }
3156
3157         talloc_free(rec->ifaces);
3158         rec->ifaces = talloc_steal(rec, ifaces);
3159
3160         talloc_free(mem_ctx);
3161         return ret;
3162 }
3163
3164 /* called to check that the local allocation of public ip addresses is ok.
3165 */
3166 static int verify_local_ip_allocation(struct ctdb_context *ctdb, struct ctdb_recoverd *rec, uint32_t pnn, struct ctdb_node_map *nodemap)
3167 {
3168         TALLOC_CTX *mem_ctx = talloc_new(NULL);
3169         struct ctdb_uptime *uptime1 = NULL;
3170         struct ctdb_uptime *uptime2 = NULL;
3171         int ret, j;
3172         bool need_takeover_run = false;
3173
3174         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3175                                 CTDB_CURRENT_NODE, &uptime1);
3176         if (ret != 0) {
3177                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3178                 talloc_free(mem_ctx);
3179                 return -1;
3180         }
3181
3182         if (interfaces_have_changed(ctdb, rec)) {
3183                 DEBUG(DEBUG_NOTICE, ("The interfaces status has changed on "
3184                                      "local node %u - force takeover run\n",
3185                                      pnn));
3186                 need_takeover_run = true;
3187         }
3188
3189         ret = ctdb_ctrl_uptime(ctdb, mem_ctx, CONTROL_TIMEOUT(),
3190                                 CTDB_CURRENT_NODE, &uptime2);
3191         if (ret != 0) {
3192                 DEBUG(DEBUG_ERR, ("Unable to get uptime from local node %u\n", pnn));
3193                 talloc_free(mem_ctx);
3194                 return -1;
3195         }
3196
3197         /* skip the check if the startrecovery time has changed */
3198         if (timeval_compare(&uptime1->last_recovery_started,
3199                             &uptime2->last_recovery_started) != 0) {
3200                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3201                 talloc_free(mem_ctx);
3202                 return 0;
3203         }
3204
3205         /* skip the check if the endrecovery time has changed */
3206         if (timeval_compare(&uptime1->last_recovery_finished,
3207                             &uptime2->last_recovery_finished) != 0) {
3208                 DEBUG(DEBUG_NOTICE, (__location__ " last recovery time changed while we read the public ip list. skipping public ip address check\n"));
3209                 talloc_free(mem_ctx);
3210                 return 0;
3211         }
3212
3213         /* skip the check if we have started but not finished recovery */
3214         if (timeval_compare(&uptime1->last_recovery_finished,
3215                             &uptime1->last_recovery_started) != 1) {
3216                 DEBUG(DEBUG_INFO, (__location__ " in the middle of recovery or ip reallocation. skipping public ip address check\n"));
3217                 talloc_free(mem_ctx);
3218
3219                 return 0;
3220         }
3221
3222         /* verify that we have the ip addresses we should have
3223            and we dont have ones we shouldnt have.
3224            if we find an inconsistency we set recmode to
3225            active on the local node and wait for the recmaster
3226            to do a full blown recovery.
3227            also if the pnn is -1 and we are healthy and can host the ip
3228            we also request a ip reallocation.
3229         */
3230         if (ctdb->tunable.disable_ip_failover == 0) {
3231                 struct ctdb_all_public_ips *ips = NULL;
3232
3233                 /* read the *available* IPs from the local node */
3234                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, CTDB_PUBLIC_IP_FLAGS_ONLY_AVAILABLE, &ips);
3235                 if (ret != 0) {
3236                         DEBUG(DEBUG_ERR, ("Unable to get available public IPs from local node %u\n", pnn));
3237                         talloc_free(mem_ctx);
3238                         return -1;
3239                 }
3240
3241                 for (j=0; j<ips->num; j++) {
3242                         if (ips->ips[j].pnn == -1 &&
3243                             nodemap->nodes[pnn].flags == 0) {
3244                                 DEBUG(DEBUG_CRIT,("Public IP '%s' is not assigned and we could serve it\n",
3245                                                   ctdb_addr_to_str(&ips->ips[j].addr)));
3246                                 need_takeover_run = true;
3247                         }
3248                 }
3249
3250                 talloc_free(ips);
3251
3252                 /* read the *known* IPs from the local node */
3253                 ret = ctdb_ctrl_get_public_ips_flags(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, mem_ctx, 0, &ips);
3254                 if (ret != 0) {
3255                         DEBUG(DEBUG_ERR, ("Unable to get known public IPs from local node %u\n", pnn));
3256                         talloc_free(mem_ctx);
3257                         return -1;
3258                 }
3259
3260                 for (j=0; j<ips->num; j++) {
3261                         if (ips->ips[j].pnn == pnn) {
3262                                 if (ctdb->do_checkpublicip && !ctdb_sys_have_ip(&ips->ips[j].addr)) {
3263                                         DEBUG(DEBUG_CRIT,("Public IP '%s' is assigned to us but not on an interface\n",
3264                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3265                                         need_takeover_run = true;
3266                                 }
3267                         } else {
3268                                 if (ctdb->do_checkpublicip &&
3269                                     ctdb_sys_have_ip(&ips->ips[j].addr)) {
3270
3271                                         DEBUG(DEBUG_CRIT,("We are still serving a public IP '%s' that we should not be serving. Removing it\n", 
3272                                                 ctdb_addr_to_str(&ips->ips[j].addr)));
3273
3274                                         if (ctdb_ctrl_release_ip(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ips->ips[j]) != 0) {
3275                                                 DEBUG(DEBUG_ERR,("Failed to release local IP address\n"));
3276                                         }
3277                                 }
3278                         }
3279                 }
3280         }
3281
3282         if (need_takeover_run) {
3283                 struct srvid_request rd;
3284                 TDB_DATA data;
3285
3286                 DEBUG(DEBUG_CRIT,("Trigger takeoverrun\n"));
3287
3288                 rd.pnn = ctdb->pnn;
3289                 rd.srvid = 0;
3290                 data.dptr = (uint8_t *)&rd;
3291                 data.dsize = sizeof(rd);
3292
3293                 ret = ctdb_client_send_message(ctdb, rec->recmaster, CTDB_SRVID_TAKEOVER_RUN, data);
3294                 if (ret != 0) {
3295                         DEBUG(DEBUG_ERR,(__location__ " Failed to send ipreallocate to recmaster :%d\n", (int)rec->recmaster));
3296                 }
3297         }
3298         talloc_free(mem_ctx);
3299         return 0;
3300 }
3301
3302
3303 static void async_getnodemap_callback(struct ctdb_context *ctdb, uint32_t node_pnn, int32_t res, TDB_DATA outdata, void *callback_data)
3304 {
3305         struct ctdb_node_map **remote_nodemaps = callback_data;
3306
3307         if (node_pnn >= ctdb->num_nodes) {
3308                 DEBUG(DEBUG_ERR,(__location__ " pnn from invalid node\n"));
3309                 return;
3310         }
3311
3312         remote_nodemaps[node_pnn] = (struct ctdb_node_map *)talloc_steal(remote_nodemaps, outdata.dptr);
3313
3314 }
3315
3316 static int get_remote_nodemaps(struct ctdb_context *ctdb, TALLOC_CTX *mem_ctx,
3317         struct ctdb_node_map *nodemap,
3318         struct ctdb_node_map **remote_nodemaps)
3319 {
3320         uint32_t *nodes;
3321
3322         nodes = list_of_active_nodes(ctdb, nodemap, mem_ctx, true);
3323         if (ctdb_client_async_control(ctdb, CTDB_CONTROL_GET_NODEMAP,
3324                                         nodes, 0,
3325                                         CONTROL_TIMEOUT(), false, tdb_null,
3326                                         async_getnodemap_callback,
3327                                         NULL,
3328                                         remote_nodemaps) != 0) {
3329                 DEBUG(DEBUG_ERR, (__location__ " Unable to pull all remote nodemaps\n"));
3330
3331                 return -1;
3332         }
3333
3334         return 0;
3335 }
3336
3337 static int update_recovery_lock_file(struct ctdb_context *ctdb)
3338 {
3339         TALLOC_CTX *tmp_ctx = talloc_new(NULL);
3340         const char *reclockfile;
3341
3342         if (ctdb_ctrl_getreclock(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, tmp_ctx, &reclockfile) != 0) {
3343                 DEBUG(DEBUG_ERR,("Failed to read reclock file from daemon\n"));
3344                 talloc_free(tmp_ctx);
3345                 return -1;      
3346         }
3347
3348         if (reclockfile == NULL) {
3349                 if (ctdb->recovery_lock_file != NULL) {
3350                         DEBUG(DEBUG_NOTICE,("Recovery lock file disabled\n"));
3351                         talloc_free(ctdb->recovery_lock_file);
3352                         ctdb->recovery_lock_file = NULL;
3353                         ctdb_recovery_unlock(ctdb);
3354                 }
3355                 talloc_free(tmp_ctx);
3356                 return 0;
3357         }
3358
3359         if (ctdb->recovery_lock_file == NULL) {
3360                 DEBUG(DEBUG_NOTICE,
3361                       ("Recovery lock file enabled (%s)\n", reclockfile));
3362                 ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3363                 ctdb_recovery_unlock(ctdb);
3364                 talloc_free(tmp_ctx);
3365                 return 0;
3366         }
3367
3368
3369         if (!strcmp(reclockfile, ctdb->recovery_lock_file)) {
3370                 talloc_free(tmp_ctx);
3371                 return 0;
3372         }
3373
3374         DEBUG(DEBUG_NOTICE,
3375               ("Recovery lock file changed (now %s)\n", reclockfile));
3376         talloc_free(ctdb->recovery_lock_file);
3377         ctdb->recovery_lock_file = talloc_strdup(ctdb, reclockfile);
3378         ctdb_recovery_unlock(ctdb);
3379
3380         talloc_free(tmp_ctx);
3381         return 0;
3382 }
3383
3384 static void main_loop(struct ctdb_context *ctdb, struct ctdb_recoverd *rec,
3385                       TALLOC_CTX *mem_ctx)
3386 {
3387         uint32_t pnn;
3388         struct ctdb_node_map *nodemap=NULL;
3389         struct ctdb_node_map *recmaster_nodemap=NULL;
3390         struct ctdb_node_map **remote_nodemaps=NULL;
3391         struct ctdb_vnn_map *vnnmap=NULL;
3392         struct ctdb_vnn_map *remote_vnnmap=NULL;
3393         int32_t debug_level;
3394         int i, j, ret;
3395         bool self_ban;
3396
3397
3398         /* verify that the main daemon is still running */
3399         if (ctdb_kill(ctdb, ctdb->ctdbd_pid, 0) != 0) {
3400                 DEBUG(DEBUG_CRIT,("CTDB daemon is no longer available. Shutting down recovery daemon\n"));
3401                 exit(-1);
3402         }
3403
3404         /* ping the local daemon to tell it we are alive */
3405         ctdb_ctrl_recd_ping(ctdb);
3406
3407         if (rec->election_timeout) {
3408                 /* an election is in progress */
3409                 return;
3410         }
3411
3412         /* read the debug level from the parent and update locally */
3413         ret = ctdb_ctrl_get_debuglevel(ctdb, CTDB_CURRENT_NODE, &debug_level);
3414         if (ret !=0) {
3415                 DEBUG(DEBUG_ERR, (__location__ " Failed to read debuglevel from parent\n"));
3416                 return;
3417         }
3418         LogLevel = debug_level;
3419
3420         /* get relevant tunables */
3421         ret = ctdb_ctrl_get_all_tunables(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->tunable);
3422         if (ret != 0) {
3423                 DEBUG(DEBUG_ERR,("Failed to get tunables - retrying\n"));
3424                 return;
3425         }
3426
3427         /* get runstate */
3428         ret = ctdb_ctrl_get_runstate(ctdb, CONTROL_TIMEOUT(),
3429                                      CTDB_CURRENT_NODE, &ctdb->runstate);
3430         if (ret != 0) {
3431                 DEBUG(DEBUG_ERR, ("Failed to get runstate - retrying\n"));
3432                 return;
3433         }
3434
3435         /* get the current recovery lock file from the server */
3436         if (update_recovery_lock_file(ctdb) != 0) {
3437                 DEBUG(DEBUG_ERR,("Failed to update the recovery lock file\n"));
3438                 return;
3439         }
3440
3441         /* Make sure that if recovery lock verification becomes disabled when
3442            we close the file
3443         */
3444         if (ctdb->recovery_lock_file == NULL) {
3445                 ctdb_recovery_unlock(ctdb);
3446         }
3447
3448         pnn = ctdb_get_pnn(ctdb);
3449
3450         /* get the vnnmap */
3451         ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), pnn, mem_ctx, &vnnmap);
3452         if (ret != 0) {
3453                 DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from node %u\n", pnn));
3454                 return;
3455         }
3456
3457
3458         /* get number of nodes */
3459         if (rec->nodemap) {
3460                 talloc_free(rec->nodemap);
3461                 rec->nodemap = NULL;
3462                 nodemap=NULL;
3463         }
3464         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), pnn, rec, &rec->nodemap);
3465         if (ret != 0) {
3466                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from node %u\n", pnn));
3467                 return;
3468         }
3469         nodemap = rec->nodemap;
3470
3471         /* remember our own node flags */
3472         rec->node_flags = nodemap->nodes[pnn].flags;
3473
3474         ban_misbehaving_nodes(rec, &self_ban);
3475         if (self_ban) {
3476                 DEBUG(DEBUG_NOTICE, ("This node was banned, restart main_loop\n"));
3477                 return;
3478         }
3479
3480         /* if the local daemon is STOPPED or BANNED, we verify that the databases are
3481            also frozen and that the recmode is set to active.
3482         */
3483         if (rec->node_flags & (NODE_FLAGS_STOPPED | NODE_FLAGS_BANNED)) {
3484                 /* If this node has become inactive then we want to
3485                  * reduce the chances of it taking over the recovery
3486                  * master role when it becomes active again.  This
3487                  * helps to stabilise the recovery master role so that
3488                  * it stays on the most stable node.
3489                  */
3490                 rec->priority_time = timeval_current();
3491
3492                 ret = ctdb_ctrl_getrecmode(ctdb, mem_ctx, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, &ctdb->recovery_mode);
3493                 if (ret != 0) {
3494                         DEBUG(DEBUG_ERR,(__location__ " Failed to read recmode from local node\n"));
3495                 }
3496                 if (ctdb->recovery_mode == CTDB_RECOVERY_NORMAL) {
3497                         DEBUG(DEBUG_ERR,("Node is stopped or banned but recovery mode is not active. Activate recovery mode and lock databases\n"));
3498
3499                         ret = ctdb_ctrl_setrecmode(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE, CTDB_RECOVERY_ACTIVE);
3500                         if (ret != 0) {
3501                                 DEBUG(DEBUG_ERR,(__location__ " Failed to activate recovery mode in STOPPED or BANNED state\n"));
3502
3503                                 return;
3504                         }
3505                         ret = ctdb_ctrl_freeze(ctdb, CONTROL_TIMEOUT(), CTDB_CURRENT_NODE);
3506                         if (ret != 0) {
3507                                 DEBUG(DEBUG_ERR,(__location__ " Failed to freeze node in STOPPED or BANNED state\n"));
3508                                 return;
3509                         }
3510                 }
3511
3512                 /* If this node is stopped or banned then it is not the recovery
3513                  * master, so don't do anything. This prevents stopped or banned
3514                  * node from starting election and sending unnecessary controls.
3515                  */
3516                 return;
3517         }
3518
3519         /* check which node is the recovery master */
3520         ret = ctdb_ctrl_getrecmaster(ctdb, mem_ctx, CONTROL_TIMEOUT(), pnn, &rec->recmaster);
3521         if (ret != 0) {
3522                 DEBUG(DEBUG_ERR, (__location__ " Unable to get recmaster from node %u\n", pnn));
3523                 return;
3524         }
3525
3526         /* If we are not the recmaster then do some housekeeping */
3527         if (rec->recmaster != pnn) {
3528                 /* Ignore any IP reallocate requests - only recmaster
3529                  * processes them
3530                  */
3531                 TALLOC_FREE(rec->reallocate_requests);
3532                 /* Clear any nodes that should be force rebalanced in
3533                  * the next takeover run.  If the recovery master role
3534                  * has moved then we don't want to process these some
3535                  * time in the future.
3536                  */
3537                 TALLOC_FREE(rec->force_rebalance_nodes);
3538         }
3539
3540         /* This is a special case.  When recovery daemon is started, recmaster
3541          * is set to -1.  If a node is not started in stopped state, then
3542          * start election to decide recovery master
3543          */
3544         if (rec->recmaster == (uint32_t)-1) {
3545                 DEBUG(DEBUG_NOTICE,(__location__ " Initial recovery master set - forcing election\n"));
3546                 force_election(rec, pnn, nodemap);
3547                 return;
3548         }
3549
3550         /* update the capabilities for all nodes */
3551         ret = update_capabilities(ctdb, nodemap);
3552         if (ret != 0) {
3553                 DEBUG(DEBUG_ERR, (__location__ " Unable to update node capabilities.\n"));
3554                 return;
3555         }
3556
3557         /*
3558          * If the current recmaster does not have CTDB_CAP_RECMASTER,
3559          * but we have, then force an election and try to become the new
3560          * recmaster.
3561          */
3562         if ((rec->ctdb->nodes[rec->recmaster]->capabilities & CTDB_CAP_RECMASTER) == 0 &&
3563             (rec->ctdb->capabilities & CTDB_CAP_RECMASTER) &&
3564              !(nodemap->nodes[pnn].flags & NODE_FLAGS_INACTIVE)) {
3565                 DEBUG(DEBUG_ERR, (__location__ " Current recmaster node %u does not have CAP_RECMASTER,"
3566                                   " but we (node %u) have - force an election\n",
3567                                   rec->recmaster, pnn));
3568                 force_election(rec, pnn, nodemap);
3569                 return;
3570         }
3571
3572         /* count how many active nodes there are */
3573         rec->num_active    = 0;
3574         rec->num_lmasters  = 0;
3575         rec->num_connected = 0;
3576         for (i=0; i<nodemap->num; i++) {
3577                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_INACTIVE)) {
3578                         rec->num_active++;
3579                         if (rec->ctdb->nodes[i]->capabilities & CTDB_CAP_LMASTER) {
3580                                 rec->num_lmasters++;
3581                         }
3582                 }
3583                 if (!(nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED)) {
3584                         rec->num_connected++;
3585                 }
3586         }
3587
3588
3589         /* verify that the recmaster node is still active */
3590         for (j=0; j<nodemap->num; j++) {
3591                 if (nodemap->nodes[j].pnn==rec->recmaster) {
3592                         break;
3593                 }
3594         }
3595
3596         if (j == nodemap->num) {
3597                 DEBUG(DEBUG_ERR, ("Recmaster node %u not in list. Force reelection\n", rec->recmaster));
3598                 force_election(rec, pnn, nodemap);
3599                 return;
3600         }
3601
3602         /* if recovery master is disconnected we must elect a new recmaster */
3603         if (nodemap->nodes[j].flags & NODE_FLAGS_DISCONNECTED) {
3604                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u is disconnected. Force reelection\n", nodemap->nodes[j].pnn));
3605                 force_election(rec, pnn, nodemap);
3606                 return;
3607         }
3608
3609         /* get nodemap from the recovery master to check if it is inactive */
3610         ret = ctdb_ctrl_getnodemap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3611                                    mem_ctx, &recmaster_nodemap);
3612         if (ret != 0) {
3613                 DEBUG(DEBUG_ERR, (__location__ " Unable to get nodemap from recovery master %u\n", 
3614                           nodemap->nodes[j].pnn));
3615                 return;
3616         }
3617
3618
3619         if ((recmaster_nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) &&
3620             (rec->node_flags & NODE_FLAGS_INACTIVE) == 0) {
3621                 DEBUG(DEBUG_NOTICE, ("Recmaster node %u no longer available. Force reelection\n", nodemap->nodes[j].pnn));
3622                 /*
3623                  * update our nodemap to carry the recmaster's notion of
3624                  * its own flags, so that we don't keep freezing the
3625                  * inactive recmaster node...
3626                  */
3627                 nodemap->nodes[j].flags = recmaster_nodemap->nodes[j].flags;
3628                 force_election(rec, pnn, nodemap);
3629                 return;
3630         }
3631
3632         /* verify that we have all ip addresses we should have and we dont
3633          * have addresses we shouldnt have.
3634          */ 
3635         if (ctdb->tunable.disable_ip_failover == 0 &&
3636             rec->takeover_runs_disable_ctx == NULL) {
3637                 if (verify_local_ip_allocation(ctdb, rec, pnn, nodemap) != 0) {
3638                         DEBUG(DEBUG_ERR, (__location__ " Public IPs were inconsistent.\n"));
3639                 }
3640         }
3641
3642
3643         /* if we are not the recmaster then we do not need to check
3644            if recovery is needed
3645          */
3646         if (pnn != rec->recmaster) {
3647                 return;
3648         }
3649
3650
3651         /* ensure our local copies of flags are right */
3652         ret = update_local_flags(rec, nodemap);
3653         if (ret == MONITOR_ELECTION_NEEDED) {
3654                 DEBUG(DEBUG_NOTICE,("update_local_flags() called for a re-election.\n"));
3655                 force_election(rec, pnn, nodemap);
3656                 return;
3657         }
3658         if (ret != MONITOR_OK) {
3659                 DEBUG(DEBUG_ERR,("Unable to update local flags\n"));
3660                 return;
3661         }
3662
3663         if (ctdb->num_nodes != nodemap->num) {
3664                 DEBUG(DEBUG_ERR, (__location__ " ctdb->num_nodes (%d) != nodemap->num (%d) reloading nodes file\n", ctdb->num_nodes, nodemap->num));
3665                 ctdb_load_nodes_file(ctdb);
3666                 return;
3667         }
3668
3669         /* verify that all active nodes agree that we are the recmaster */
3670         switch (verify_recmaster(rec, nodemap, pnn)) {
3671         case MONITOR_RECOVERY_NEEDED:
3672                 /* can not happen */
3673                 return;
3674         case MONITOR_ELECTION_NEEDED:
3675                 force_election(rec, pnn, nodemap);
3676                 return;
3677         case MONITOR_OK:
3678                 break;
3679         case MONITOR_FAILED:
3680                 return;
3681         }
3682
3683
3684         if (rec->need_recovery) {
3685                 /* a previous recovery didn't finish */
3686                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3687                 return;
3688         }
3689
3690         /* verify that all active nodes are in normal mode 
3691            and not in recovery mode 
3692         */
3693         switch (verify_recmode(ctdb, nodemap)) {
3694         case MONITOR_RECOVERY_NEEDED:
3695                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3696                 return;
3697         case MONITOR_FAILED:
3698                 return;
3699         case MONITOR_ELECTION_NEEDED:
3700                 /* can not happen */
3701         case MONITOR_OK:
3702                 break;
3703         }
3704
3705
3706         if (ctdb->recovery_lock_file != NULL) {
3707                 /* We must already hold the recovery lock */
3708                 if (!ctdb_recovery_have_lock(ctdb)) {
3709                         DEBUG(DEBUG_ERR,("Failed recovery lock sanity check.  Force a recovery\n"));
3710                         ctdb_set_culprit(rec, ctdb->pnn);
3711                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3712                         return;
3713                 }
3714         }
3715
3716
3717         /* if there are takeovers requested, perform it and notify the waiters */
3718         if (rec->takeover_runs_disable_ctx == NULL &&
3719             rec->reallocate_requests) {
3720                 process_ipreallocate_requests(ctdb, rec);
3721         }
3722
3723         /* get the nodemap for all active remote nodes
3724          */
3725         remote_nodemaps = talloc_array(mem_ctx, struct ctdb_node_map *, nodemap->num);
3726         if (remote_nodemaps == NULL) {
3727                 DEBUG(DEBUG_ERR, (__location__ " failed to allocate remote nodemap array\n"));
3728                 return;
3729         }
3730         for(i=0; i<nodemap->num; i++) {
3731                 remote_nodemaps[i] = NULL;
3732         }
3733         if (get_remote_nodemaps(ctdb, mem_ctx, nodemap, remote_nodemaps) != 0) {
3734                 DEBUG(DEBUG_ERR,(__location__ " Failed to read remote nodemaps\n"));
3735                 return;
3736         } 
3737
3738         /* verify that all other nodes have the same nodemap as we have
3739         */
3740         for (j=0; j<nodemap->num; j++) {
3741                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3742                         continue;
3743                 }
3744
3745                 if (remote_nodemaps[j] == NULL) {
3746                         DEBUG(DEBUG_ERR,(__location__ " Did not get a remote nodemap for node %d, restarting monitoring\n", j));
3747                         ctdb_set_culprit(rec, j);
3748
3749                         return;
3750                 }
3751
3752                 /* if the nodes disagree on how many nodes there are
3753                    then this is a good reason to try recovery
3754                  */
3755                 if (remote_nodemaps[j]->num != nodemap->num) {
3756                         DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different node count. %u vs %u of the local node\n",
3757                                   nodemap->nodes[j].pnn, remote_nodemaps[j]->num, nodemap->num));
3758                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3759                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3760                         return;
3761                 }
3762
3763                 /* if the nodes disagree on which nodes exist and are
3764                    active, then that is also a good reason to do recovery
3765                  */
3766                 for (i=0;i<nodemap->num;i++) {
3767                         if (remote_nodemaps[j]->nodes[i].pnn != nodemap->nodes[i].pnn) {
3768                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different nodemap pnn for %d (%u vs %u).\n", 
3769                                           nodemap->nodes[j].pnn, i, 
3770                                           remote_nodemaps[j]->nodes[i].pnn, nodemap->nodes[i].pnn));
3771                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3772                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3773                                             vnnmap);
3774                                 return;
3775                         }
3776                 }
3777         }
3778
3779         /*
3780          * Update node flags obtained from each active node. This ensure we have
3781          * up-to-date information for all the nodes.
3782          */
3783         for (j=0; j<nodemap->num; j++) {
3784                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3785                         continue;
3786                 }
3787                 nodemap->nodes[j].flags = remote_nodemaps[j]->nodes[j].flags;
3788         }
3789
3790         for (j=0; j<nodemap->num; j++) {
3791                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3792                         continue;
3793                 }
3794
3795                 /* verify the flags are consistent
3796                 */
3797                 for (i=0; i<nodemap->num; i++) {
3798                         if (nodemap->nodes[i].flags & NODE_FLAGS_DISCONNECTED) {
3799                                 continue;
3800                         }
3801                         
3802                         if (nodemap->nodes[i].flags != remote_nodemaps[j]->nodes[i].flags) {
3803                                 DEBUG(DEBUG_ERR, (__location__ " Remote node:%u has different flags for node %u. It has 0x%02x vs our 0x%02x\n", 
3804                                   nodemap->nodes[j].pnn, 
3805                                   nodemap->nodes[i].pnn, 
3806                                   remote_nodemaps[j]->nodes[i].flags,
3807                                   nodemap->nodes[i].flags));
3808                                 if (i == j) {
3809                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from remote node %d for cluster update of its own flags\n", remote_nodemaps[j]->nodes[i].flags, j));
3810                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, remote_nodemaps[j]->nodes[i].flags);
3811                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3812                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3813                                                     vnnmap);
3814                                         return;
3815                                 } else {
3816                                         DEBUG(DEBUG_ERR,("Use flags 0x%02x from local recmaster node for cluster update of node %d flags\n", nodemap->nodes[i].flags, i));
3817                                         update_flags_on_all_nodes(ctdb, nodemap, nodemap->nodes[i].pnn, nodemap->nodes[i].flags);
3818                                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3819                                         do_recovery(rec, mem_ctx, pnn, nodemap, 
3820                                                     vnnmap);
3821                                         return;
3822                                 }
3823                         }
3824                 }
3825         }
3826
3827
3828         /* There must be the same number of lmasters in the vnn map as
3829          * there are active nodes with the lmaster capability...  or
3830          * do a recovery.
3831          */
3832         if (vnnmap->size != rec->num_lmasters) {
3833                 DEBUG(DEBUG_ERR, (__location__ " The vnnmap count is different from the number of active lmaster nodes: %u vs %u\n",
3834                           vnnmap->size, rec->num_lmasters));
3835                 ctdb_set_culprit(rec, ctdb->pnn);
3836                 do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3837                 return;
3838         }
3839
3840         /* verify that all active nodes in the nodemap also exist in 
3841            the vnnmap.
3842          */
3843         for (j=0; j<nodemap->num; j++) {
3844                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3845                         continue;
3846                 }
3847                 if (nodemap->nodes[j].pnn == pnn) {
3848                         continue;
3849                 }
3850
3851                 for (i=0; i<vnnmap->size; i++) {
3852                         if (vnnmap->map[i] == nodemap->nodes[j].pnn) {
3853                                 break;
3854                         }
3855                 }
3856                 if (i == vnnmap->size) {
3857                         DEBUG(DEBUG_ERR, (__location__ " Node %u is active in the nodemap but did not exist in the vnnmap\n", 
3858                                   nodemap->nodes[j].pnn));
3859                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3860                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3861                         return;
3862                 }
3863         }
3864
3865         
3866         /* verify that all other nodes have the same vnnmap
3867            and are from the same generation
3868          */
3869         for (j=0; j<nodemap->num; j++) {
3870                 if (nodemap->nodes[j].flags & NODE_FLAGS_INACTIVE) {
3871                         continue;
3872                 }
3873                 if (nodemap->nodes[j].pnn == pnn) {
3874                         continue;
3875                 }
3876
3877                 ret = ctdb_ctrl_getvnnmap(ctdb, CONTROL_TIMEOUT(), nodemap->nodes[j].pnn, 
3878                                           mem_ctx, &remote_vnnmap);
3879                 if (ret != 0) {
3880                         DEBUG(DEBUG_ERR, (__location__ " Unable to get vnnmap from remote node %u\n", 
3881                                   nodemap->nodes[j].pnn));
3882                         return;
3883                 }
3884
3885                 /* verify the vnnmap generation is the same */
3886                 if (vnnmap->generation != remote_vnnmap->generation) {
3887                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different generation of vnnmap. %u vs %u (ours)\n", 
3888                                   nodemap->nodes[j].pnn, remote_vnnmap->generation, vnnmap->generation));
3889                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3890                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3891                         return;
3892                 }
3893
3894                 /* verify the vnnmap size is the same */
3895                 if (vnnmap->size != remote_vnnmap->size) {
3896                         DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different size of vnnmap. %u vs %u (ours)\n", 
3897                                   nodemap->nodes[j].pnn, remote_vnnmap->size, vnnmap->size));
3898                         ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3899                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3900                         return;
3901                 }
3902
3903                 /* verify the vnnmap is the same */
3904                 for (i=0;i<vnnmap->size;i++) {
3905                         if (remote_vnnmap->map[i] != vnnmap->map[i]) {
3906                                 DEBUG(DEBUG_ERR, (__location__ " Remote node %u has different vnnmap.\n", 
3907                                           nodemap->nodes[j].pnn));
3908                                 ctdb_set_culprit(rec, nodemap->nodes[j].pnn);
3909                                 do_recovery(rec, mem_ctx, pnn, nodemap, 
3910                                             vnnmap);
3911                                 return;
3912                         }
3913                 }
3914         }
3915
3916         /* we might need to change who has what IP assigned */
3917         if (rec->need_takeover_run) {
3918                 uint32_t culprit = (uint32_t)-1;
3919
3920                 rec->need_takeover_run = false;
3921
3922                 /* update the list of public ips that a node can handle for
3923                    all connected nodes
3924                 */
3925                 ret = ctdb_reload_remote_public_ips(ctdb, rec, nodemap, &culprit);
3926                 if (ret != 0) {
3927                         DEBUG(DEBUG_ERR,("Failed to read public ips from remote node %d\n",
3928                                          culprit));
3929                         rec->need_takeover_run = true;
3930                         return;
3931                 }
3932
3933                 /* execute the "startrecovery" event script on all nodes */
3934                 ret = run_startrecovery_eventscript(rec, nodemap);
3935                 if (ret!=0) {
3936                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'startrecovery' event on cluster\n"));
3937                         ctdb_set_culprit(rec, ctdb->pnn);
3938                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3939                         return;
3940                 }
3941
3942                 /* If takeover run fails, then the offending nodes are
3943                  * assigned ban culprit counts. And we re-try takeover.
3944                  * If takeover run fails repeatedly, the node would get
3945                  * banned.
3946                  *
3947                  * If rec->need_takeover_run is not set to true at this
3948                  * failure, monitoring is disabled cluster-wide (via
3949                  * startrecovery eventscript) and will not get enabled.
3950                  */
3951                 if (!do_takeover_run(rec, nodemap, true)) {
3952                         return;
3953                 }
3954
3955                 /* execute the "recovered" event script on all nodes */
3956                 ret = run_recovered_eventscript(rec, nodemap, "monitor_cluster");
3957 #if 0
3958 // we cant check whether the event completed successfully
3959 // since this script WILL fail if the node is in recovery mode
3960 // and if that race happens, the code here would just cause a second
3961 // cascading recovery.
3962                 if (ret!=0) {
3963                         DEBUG(DEBUG_ERR, (__location__ " Unable to run the 'recovered' event on cluster. Update of public ips failed.\n"));
3964                         ctdb_set_culprit(rec, ctdb->pnn);
3965                         do_recovery(rec, mem_ctx, pnn, nodemap, vnnmap);
3966                 }
3967 #endif
3968         }
3969 }
3970
3971 /*
3972   the main monitoring loop
3973  */
3974 static void monitor_cluster(struct ctdb_context *ctdb)
3975 {
3976         struct ctdb_recoverd *rec;
3977
3978         DEBUG(DEBUG_NOTICE,("monitor_cluster starting\n"));
3979
3980         rec = talloc_zero(ctdb, struct ctdb_recoverd);
3981         CTDB_NO_MEMORY_FATAL(ctdb, rec);
3982
3983         rec->ctdb = ctdb;
3984
3985         rec->takeover_run_in_progress = false;
3986
3987         rec->priority_time = timeval_current();
3988
3989         /* register a message port for sending memory dumps */
3990         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_MEM_DUMP, mem_dump_handler, rec);
3991
3992         /* register a message port for requesting logs */
3993         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_GETLOG, getlog_handler, rec);
3994
3995         /* register a message port for clearing logs */
3996         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_CLEARLOG, clearlog_handler, rec);
3997
3998         /* register a message port for recovery elections */
3999         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECOVERY, election_handler, rec);
4000
4001         /* when nodes are disabled/enabled */
4002         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_SET_NODE_FLAGS, monitor_handler, rec);
4003
4004         /* when we are asked to puch out a flag change */
4005         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_PUSH_NODE_FLAGS, push_flags_handler, rec);
4006
4007         /* register a message port for vacuum fetch */
4008         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_VACUUM_FETCH, vacuum_fetch_handler, rec);
4009
4010         /* register a message port for reloadnodes  */
4011         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RELOAD_NODES, reload_nodes_handler, rec);
4012
4013         /* register a message port for performing a takeover run */
4014         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_TAKEOVER_RUN, ip_reallocate_handler, rec);
4015
4016         /* register a message port for disabling the ip check for a short while */
4017         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_DISABLE_IP_CHECK, disable_ip_check_handler, rec);
4018
4019         /* register a message port for updating the recovery daemons node assignment for an ip */
4020         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_RECD_UPDATE_IP, recd_update_ip_handler, rec);
4021
4022         /* register a message port for forcing a rebalance of a node next
4023            reallocation */
4024         ctdb_client_set_message_handler(ctdb, CTDB_SRVID_REBALANCE_NODE, recd_node_rebalance_handler, rec);
4025
4026         /* Register a message port for disabling takeover runs */
4027         ctdb_client_set_message_handler(ctdb,
4028                                         CTDB_SRVID_DISABLE_TAKEOVER_RUNS,
4029                                         disable_takeover_runs_handler, rec);
4030
4031         /* register a message port for detaching database */
4032         ctdb_client_set_message_handler(ctdb,
4033                                         CTDB_SRVID_DETACH_DATABASE,
4034                                         detach_database_handler, rec);
4035
4036         for (;;) {
4037                 TALLOC_CTX *mem_ctx = talloc_new(ctdb);
4038                 struct timeval start;
4039                 double elapsed;
4040
4041                 if (!mem_ctx) {
4042                         DEBUG(DEBUG_CRIT,(__location__
4043                                           " Failed to create temp context\n"));
4044                         exit(-1);
4045                 }
4046
4047                 start = timeval_current();
4048                 main_loop(ctdb, rec, mem_ctx);
4049                 talloc_free(mem_ctx);
4050
4051                 /* we only check for recovery once every second */
4052                 elapsed = timeval_elapsed(&start);
4053                 if (elapsed < ctdb->tunable.recover_interval) {
4054                         ctdb_wait_timeout(ctdb, ctdb->tunable.recover_interval
4055                                           - elapsed);
4056                 }
4057         }
4058 }
4059
4060 /*
4061   event handler for when the main ctdbd dies
4062  */
4063 static void ctdb_recoverd_parent(struct event_context *ev, struct fd_event *fde, 
4064                                  uint16_t flags, void *private_data)
4065 {
4066         DEBUG(DEBUG_ALERT,("recovery daemon parent died - exiting\n"));
4067         _exit(1);
4068 }
4069
4070 /*
4071   called regularly to verify that the recovery daemon is still running
4072  */
4073 static void ctdb_check_recd(struct event_context *ev, struct timed_event *te, 
4074                               struct timeval yt, void *p)
4075 {
4076         struct ctdb_context *ctdb = talloc_get_type(p, struct ctdb_context);
4077
4078         if (ctdb_kill(ctdb, ctdb->recoverd_pid, 0) != 0) {
4079                 DEBUG(DEBUG_ERR,("Recovery daemon (pid:%d) is no longer running. Trying to restart recovery daemon.\n", (int)ctdb->recoverd_pid));
4080
4081                 event_add_timed(ctdb->ev, ctdb, timeval_zero(), 
4082                                 ctdb_restart_recd, ctdb);
4083
4084                 return;
4085         }
4086
4087         event_add_timed(ctdb->ev, ctdb->recd_ctx,
4088                         timeval_current_ofs(30, 0),
4089                         ctdb_check_recd, ctdb);
4090 }
4091
4092 static void recd_sig_child_handler(struct event_context *ev,
4093         struct signal_event *se, int signum, int count,
4094         void *dont_care, 
4095         void *private_data)
4096 {
4097 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4098         int status;
4099         pid_t pid = -1;
4100
4101         while (pid != 0) {
4102                 pid = waitpid(-1, &status, WNOHANG);
4103                 if (pid == -1) {
4104                         if (errno != ECHILD) {
4105                                 DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%s(%d)\n", strerror(errno),errno));
4106                         }
4107                         return;
4108                 }
4109                 if (pid > 0) {
4110                         DEBUG(DEBUG_DEBUG, ("RECD SIGCHLD from %d\n", (int)pid));
4111                 }
4112         }
4113 }
4114
4115 /*
4116   startup the recovery daemon as a child of the main ctdb daemon
4117  */
4118 int ctdb_start_recoverd(struct ctdb_context *ctdb)
4119 {
4120         int fd[2];
4121         struct signal_event *se;
4122         struct tevent_fd *fde;
4123
4124         if (pipe(fd) != 0) {
4125                 return -1;
4126         }
4127
4128         ctdb->recoverd_pid = ctdb_fork_no_free_ringbuffer(ctdb);
4129         if (ctdb->recoverd_pid == -1) {
4130                 return -1;
4131         }
4132
4133         if (ctdb->recoverd_pid != 0) {
4134                 talloc_free(ctdb->recd_ctx);
4135                 ctdb->recd_ctx = talloc_new(ctdb);
4136                 CTDB_NO_MEMORY(ctdb, ctdb->recd_ctx);
4137
4138                 close(fd[0]);
4139                 event_add_timed(ctdb->ev, ctdb->recd_ctx,
4140                                 timeval_current_ofs(30, 0),
4141                                 ctdb_check_recd, ctdb);
4142                 return 0;
4143         }
4144
4145         close(fd[1]);
4146
4147         srandom(getpid() ^ time(NULL));
4148
4149         /* Clear the log ringbuffer */
4150         ctdb_clear_log(ctdb);
4151
4152         ctdb_set_process_name("ctdb_recovered");
4153         if (switch_from_server_to_client(ctdb, "recoverd") != 0) {
4154                 DEBUG(DEBUG_CRIT, (__location__ "ERROR: failed to switch recovery daemon into client mode. shutting down.\n"));
4155                 exit(1);
4156         }
4157
4158         DEBUG(DEBUG_DEBUG, (__location__ " Created PIPE FD:%d to recovery daemon\n", fd[0]));
4159
4160         fde = event_add_fd(ctdb->ev, ctdb, fd[0], EVENT_FD_READ,
4161                      ctdb_recoverd_parent, &fd[0]);
4162         tevent_fd_set_auto_close(fde);
4163
4164         /* set up a handler to pick up sigchld */
4165         se = event_add_signal(ctdb->ev, ctdb,
4166                                      SIGCHLD, 0,
4167                                      recd_sig_child_handler,
4168                                      ctdb);
4169         if (se == NULL) {
4170                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD in recovery daemon\n"));
4171                 exit(1);
4172         }
4173
4174         monitor_cluster(ctdb);
4175
4176         DEBUG(DEBUG_ALERT,("ERROR: ctdb_recoverd finished!?\n"));
4177         return -1;
4178 }
4179
4180 /*
4181   shutdown the recovery daemon
4182  */
4183 void ctdb_stop_recoverd(struct ctdb_context *ctdb)
4184 {
4185         if (ctdb->recoverd_pid == 0) {
4186                 return;
4187         }
4188
4189         DEBUG(DEBUG_NOTICE,("Shutting down recovery daemon\n"));
4190         ctdb_kill(ctdb, ctdb->recoverd_pid, SIGTERM);
4191
4192         TALLOC_FREE(ctdb->recd_ctx);
4193         TALLOC_FREE(ctdb->recd_ping_count);
4194 }
4195
4196 static void ctdb_restart_recd(struct event_context *ev, struct timed_event *te, 
4197                        struct timeval t, void *private_data)
4198 {
4199         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
4200
4201         DEBUG(DEBUG_ERR,("Restarting recovery daemon\n"));
4202         ctdb_stop_recoverd(ctdb);
4203         ctdb_start_recoverd(ctdb);
4204 }