ctdb/server/ctdb_daemon.c set socket close on exec
[metze/samba/wip.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213                            void *private_data)
214 {
215         struct ctdb_client *client =
216                 talloc_get_type_abort(private_data, struct ctdb_client);
217         struct ctdb_req_tunnel_old *c, *pkt;
218         size_t len;
219
220         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
221
222         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224                                len, struct ctdb_req_tunnel_old);
225         if (c == NULL) {
226                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227                 return;
228         }
229
230         talloc_set_name_const(c, "req_tunnel packet");
231
232         c->tunnel_id = tunnel_id;
233         c->flags = pkt->flags;
234         c->datalen = pkt->datalen;
235         memcpy(c->data, pkt->data, pkt->datalen);
236
237         daemon_queue_send(client, &c->hdr);
238
239         talloc_free(c);
240 }
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         struct ctdb_db_context *ctdb_db;
248
249         ctdb_takeover_client_destructor_hook(client);
250         reqid_remove(client->ctdb->idr, client->client_id);
251         client->ctdb->num_clients--;
252
253         if (client->num_persistent_updates != 0) {
254                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256         }
257         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258         if (ctdb_db) {
259                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260                                   "commit active. Forcing recovery.\n"));
261                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
262
263                 /*
264                  * trans3 transaction state:
265                  *
266                  * The destructor sets the pointer to NULL.
267                  */
268                 talloc_free(ctdb_db->persistent_state);
269         }
270
271         return 0;
272 }
273
274
275 /*
276   this is called when the ctdb daemon received a ctdb request message
277   from a local client over the unix domain socket
278  */
279 static void daemon_request_message_from_client(struct ctdb_client *client, 
280                                                struct ctdb_req_message_old *c)
281 {
282         TDB_DATA data;
283         int res;
284
285         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
287         }
288
289         /* maybe the message is for another client on this node */
290         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292                 return;
293         }
294
295         /* its for a remote node */
296         data.dptr = &c->data[0];
297         data.dsize = c->datalen;
298         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299                                        c->srvid, data);
300         if (res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302                          c->hdr.destnode));
303         }
304 }
305
306
307 struct daemon_call_state {
308         struct ctdb_client *client;
309         uint32_t reqid;
310         struct ctdb_call *call;
311         struct timeval start_time;
312
313         /* readonly request ? */
314         uint32_t readonly_fetch;
315         uint32_t client_callid;
316 };
317
318 /* 
319    complete a call from a client 
320 */
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 {
323         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
324                                                            struct daemon_call_state);
325         struct ctdb_reply_call_old *r;
326         int res;
327         uint32_t length;
328         struct ctdb_client *client = dstate->client;
329         struct ctdb_db_context *ctdb_db = state->ctdb_db;
330
331         talloc_steal(client, dstate);
332         talloc_steal(dstate, dstate->call);
333
334         res = ctdb_daemon_call_recv(state, dstate->call);
335         if (res != 0) {
336                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338
339                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340                 return;
341         }
342
343         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344         /* If the client asked for readonly FETCH, we remapped this to 
345            FETCH_WITH_HEADER when calling the daemon. So we must
346            strip the extra header off the reply data before passing
347            it back to the client.
348         */
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 length -= sizeof(struct ctdb_ltdb_header);
352         }
353
354         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
355                                length, struct ctdb_reply_call_old);
356         if (r == NULL) {
357                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360                 return;
361         }
362         r->hdr.reqid        = dstate->reqid;
363         r->status           = dstate->call->status;
364
365         if (dstate->readonly_fetch
366         && dstate->client_callid == CTDB_FETCH_FUNC) {
367                 /* client only asked for a FETCH so we must strip off
368                    the extra ctdb_ltdb header
369                 */
370                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372         } else {
373                 r->datalen          = dstate->call->reply_data.dsize;
374                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
375         }
376
377         res = daemon_queue_send(client, &r->hdr);
378         if (res == -1) {
379                 /* client is dead - return immediately */
380                 return;
381         }
382         if (res != 0) {
383                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384         }
385         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387         talloc_free(dstate);
388 }
389
390 struct ctdb_daemon_packet_wrap {
391         struct ctdb_context *ctdb;
392         uint32_t client_id;
393 };
394
395 /*
396   a wrapper to catch disconnected clients
397  */
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 {
400         struct ctdb_client *client;
401         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
402                                                             struct ctdb_daemon_packet_wrap);
403         if (w == NULL) {
404                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405                 return;
406         }
407
408         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409         if (client == NULL) {
410                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411                          w->client_id));
412                 talloc_free(w);
413                 return;
414         }
415         talloc_free(w);
416
417         /* process it */
418         daemon_incoming_packet(client, hdr);    
419 }
420
421 struct ctdb_deferred_fetch_call {
422         struct ctdb_deferred_fetch_call *next, *prev;
423         struct ctdb_req_call_old *c;
424         struct ctdb_daemon_packet_wrap *w;
425 };
426
427 struct ctdb_deferred_fetch_queue {
428         struct ctdb_deferred_fetch_call *deferred_calls;
429 };
430
431 struct ctdb_deferred_requeue {
432         struct ctdb_deferred_fetch_call *dfc;
433         struct ctdb_client *client;
434 };
435
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438                                     struct tevent_timer *te,
439                                     struct timeval t, void *private_data)
440 {
441         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442         struct ctdb_client *client = dfr->client;
443
444         talloc_steal(client, dfr->dfc->c);
445         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446         talloc_free(dfr);
447 }
448
449 /* the referral context is destroyed either after a timeout or when the initial
450    fetch-lock has finished.
451    at this stage, immediately start reprocessing the queued up deferred
452    calls so they get reprocessed immediately (and since we are dmaster at
453    this stage, trigger the waiting smbd processes to pick up and aquire the
454    record right away.
455 */
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
457 {
458
459         /* need to reprocess the packets from the queue explicitely instead of
460            just using a normal destructor since we want, need, to
461            call the clients in the same oder as the requests queued up
462         */
463         while (dfq->deferred_calls != NULL) {
464                 struct ctdb_client *client;
465                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466                 struct ctdb_deferred_requeue *dfr;
467
468                 DLIST_REMOVE(dfq->deferred_calls, dfc);
469
470                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471                 if (client == NULL) {
472                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473                                  dfc->w->client_id));
474                         continue;
475                 }
476
477                 /* process it by pushing it back onto the eventloop */
478                 dfr = talloc(client, struct ctdb_deferred_requeue);
479                 if (dfr == NULL) {
480                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481                         continue;
482                 }
483
484                 dfr->dfc    = talloc_steal(dfr, dfc);
485                 dfr->client = client;
486
487                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488                                  reprocess_deferred_call, dfr);
489         }
490
491         return 0;
492 }
493
494 /* insert the new deferral context into the rb tree.
495    there should never be a pre-existing context here, but check for it
496    warn and destroy the previous context if there is already a deferral context
497    for this key.
498 */
499 static void *insert_dfq_callback(void *parm, void *data)
500 {
501         if (data) {
502                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503                 talloc_free(data);
504         }
505         return parm;
506 }
507
508 /* if the original fetch-lock did not complete within a reasonable time,
509    free the context and context for all deferred requests to cause them to be
510    re-inserted into the event system.
511 */
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513                         struct timeval t, void *private_data)
514 {
515         talloc_free(private_data);
516 }
517
518 /* This function is used in the local daemon to register a KEY in a database
519    for being "fetched"
520    While the remote fetch is in-flight, any futher attempts to re-fetch the
521    same record will be deferred until the fetch completes.
522 */
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 {
525         uint32_t *k;
526         struct ctdb_deferred_fetch_queue *dfq;
527
528         k = ctdb_key_to_idkey(call, call->key);
529         if (k == NULL) {
530                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531                 return -1;
532         }
533
534         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
535         if (dfq == NULL) {
536                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537                 talloc_free(k);
538                 return -1;
539         }
540         dfq->deferred_calls = NULL;
541
542         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543
544         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545
546         /* if the fetch havent completed in 30 seconds, just tear it all down
547            and let it try again as the events are reissued */
548         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549                          dfq_timeout, dfq);
550
551         talloc_free(k);
552         return 0;
553 }
554
555 /* check if this is a duplicate request to a fetch already in-flight
556    if it is, make this call deferred to be reprocessed later when
557    the in-flight fetch completes.
558 */
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 {
561         uint32_t *k;
562         struct ctdb_deferred_fetch_queue *dfq;
563         struct ctdb_deferred_fetch_call *dfc;
564
565         k = ctdb_key_to_idkey(c, key);
566         if (k == NULL) {
567                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568                 return -1;
569         }
570
571         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572         if (dfq == NULL) {
573                 talloc_free(k);
574                 return -1;
575         }
576
577
578         talloc_free(k);
579
580         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581         if (dfc == NULL) {
582                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583                 return -1;
584         }
585
586         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587         if (dfc->w == NULL) {
588                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589                 talloc_free(dfc);
590                 return -1;
591         }
592
593         dfc->c = talloc_steal(dfc, c);
594         dfc->w->ctdb = ctdb_db->ctdb;
595         dfc->w->client_id = client->client_id;
596
597         DLIST_ADD_END(dfq->deferred_calls, dfc);
598
599         return 0;
600 }
601
602
603 /*
604   this is called when the ctdb daemon received a ctdb request call
605   from a local client over the unix domain socket
606  */
607 static void daemon_request_call_from_client(struct ctdb_client *client, 
608                                             struct ctdb_req_call_old *c)
609 {
610         struct ctdb_call_state *state;
611         struct ctdb_db_context *ctdb_db;
612         struct daemon_call_state *dstate;
613         struct ctdb_call *call;
614         struct ctdb_ltdb_header header;
615         TDB_DATA key, data;
616         int ret;
617         struct ctdb_context *ctdb = client->ctdb;
618         struct ctdb_daemon_packet_wrap *w;
619
620         CTDB_INCREMENT_STAT(ctdb, total_calls);
621         CTDB_INCREMENT_STAT(ctdb, pending_calls);
622
623         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624         if (!ctdb_db) {
625                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626                           c->db_id));
627                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628                 return;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 /*
633                  * this is just a warning, as the tdb should be empty anyway,
634                  * and only persistent databases can be unhealthy, which doesn't
635                  * use this code patch
636                  */
637                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
639         }
640
641         key.dptr = c->data;
642         key.dsize = c->keylen;
643
644         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645         CTDB_NO_MEMORY_VOID(ctdb, w);   
646
647         w->ctdb = ctdb;
648         w->client_id = client->client_id;
649
650         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
651                                            (struct ctdb_req_header *)c, &data,
652                                            daemon_incoming_packet_wrap, w, true);
653         if (ret == -2) {
654                 /* will retry later */
655                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656                 return;
657         }
658
659         talloc_free(w);
660
661         if (ret != 0) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667
668         /* check if this fetch request is a duplicate for a
669            request we already have in flight. If so defer it until
670            the first request completes.
671         */
672         if (ctdb->tunable.fetch_collapse == 1) {
673                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674                         ret = ctdb_ltdb_unlock(ctdb_db, key);
675                         if (ret != 0) {
676                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677                         }
678                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
679                         talloc_free(data.dptr);
680                         return;
681                 }
682         }
683
684         /* Dont do READONLY if we don't have a tracking database */
685         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686                 c->flags &= ~CTDB_WANT_READONLY;
687         }
688
689         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690                 header.flags &= ~CTDB_REC_RO_FLAGS;
691                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695                 }
696                 /* and clear out the tracking data */
697                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
699                 }
700         }
701
702         /* if we are revoking, we must defer all other calls until the revoke
703          * had completed.
704          */
705         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706                 talloc_free(data.dptr);
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711                 }
712                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713                 return;
714         }
715
716         if ((header.dmaster == ctdb->pnn)
717         && (!(c->flags & CTDB_WANT_READONLY))
718         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
720                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722                 }
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724
725                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to start record revoke");
727                 }
728                 talloc_free(data.dptr);
729
730                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732                 }
733
734                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735                 return;
736         }               
737
738         dstate = talloc(client, struct daemon_call_state);
739         if (dstate == NULL) {
740                 ret = ctdb_ltdb_unlock(ctdb_db, key);
741                 if (ret != 0) {
742                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743                 }
744
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747                 return;
748         }
749         dstate->start_time = timeval_current();
750         dstate->client = client;
751         dstate->reqid  = c->hdr.reqid;
752         talloc_steal(dstate, data.dptr);
753
754         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755         if (call == NULL) {
756                 ret = ctdb_ltdb_unlock(ctdb_db, key);
757                 if (ret != 0) {
758                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759                 }
760
761                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764                 return;
765         }
766
767         dstate->readonly_fetch = 0;
768         call->call_id = c->callid;
769         call->key = key;
770         call->call_data.dptr = c->data + c->keylen;
771         call->call_data.dsize = c->calldatalen;
772         call->flags = c->flags;
773
774         if (c->flags & CTDB_WANT_READONLY) {
775                 /* client wants readonly record, so translate this into a 
776                    fetch with header. remember what the client asked for
777                    so we can remap the reply back to the proper format for
778                    the client in the reply
779                  */
780                 dstate->client_callid = call->call_id;
781                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782                 dstate->readonly_fetch = 1;
783         }
784
785         if (header.dmaster == ctdb->pnn) {
786                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787         } else {
788                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789                 if (ctdb->tunable.fetch_collapse == 1) {
790                         /* This request triggered a remote fetch-lock.
791                            set up a deferral for this key so any additional
792                            fetch-locks are deferred until the current one
793                            finishes.
794                          */
795                         setup_deferred_fetch_locks(ctdb_db, call);
796                 }
797         }
798
799         ret = ctdb_ltdb_unlock(ctdb_db, key);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802         }
803
804         if (state == NULL) {
805                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808                 return;
809         }
810         talloc_steal(state, dstate);
811         talloc_steal(client, state);
812
813         state->async.fn = daemon_call_from_client_callback;
814         state->async.private_data = dstate;
815 }
816
817
818 static void daemon_request_control_from_client(struct ctdb_client *client, 
819                                                struct ctdb_req_control_old *c);
820 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
821                                               struct ctdb_req_tunnel_old *c);
822
823 /* data contains a packet from the client */
824 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
825 {
826         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
827         TALLOC_CTX *tmp_ctx;
828         struct ctdb_context *ctdb = client->ctdb;
829
830         /* place the packet as a child of a tmp_ctx. We then use
831            talloc_free() below to free it. If any of the calls want
832            to keep it, then they will steal it somewhere else, and the
833            talloc_free() will be a no-op */
834         tmp_ctx = talloc_new(client);
835         talloc_steal(tmp_ctx, hdr);
836
837         if (hdr->ctdb_magic != CTDB_MAGIC) {
838                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
839                 goto done;
840         }
841
842         if (hdr->ctdb_version != CTDB_PROTOCOL) {
843                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
844                 goto done;
845         }
846
847         switch (hdr->operation) {
848         case CTDB_REQ_CALL:
849                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
850                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851                 break;
852
853         case CTDB_REQ_MESSAGE:
854                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
855                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856                 break;
857
858         case CTDB_REQ_CONTROL:
859                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
860                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
861                 break;
862
863         case CTDB_REQ_TUNNEL:
864                 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
865                 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
866                 break;
867
868         default:
869                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
870                          hdr->operation));
871         }
872
873 done:
874         talloc_free(tmp_ctx);
875 }
876
877 /*
878   called when the daemon gets a incoming packet
879  */
880 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
881 {
882         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
883         struct ctdb_req_header *hdr;
884
885         if (cnt == 0) {
886                 talloc_free(client);
887                 return;
888         }
889
890         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
891
892         if (cnt < sizeof(*hdr)) {
893                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
894                                (unsigned)cnt);
895                 return;
896         }
897         hdr = (struct ctdb_req_header *)data;
898         if (cnt != hdr->length) {
899                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
900                                (unsigned)hdr->length, (unsigned)cnt);
901                 return;
902         }
903
904         if (hdr->ctdb_magic != CTDB_MAGIC) {
905                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
906                 return;
907         }
908
909         if (hdr->ctdb_version != CTDB_PROTOCOL) {
910                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
911                 return;
912         }
913
914         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
915                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
916                  hdr->srcnode, hdr->destnode));
917
918         /* it is the responsibility of the incoming packet function to free 'data' */
919         daemon_incoming_packet(client, hdr);
920 }
921
922
923 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
924 {
925         if (client_pid->ctdb->client_pids != NULL) {
926                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
927         }
928
929         return 0;
930 }
931
932
933 static void ctdb_accept_client(struct tevent_context *ev,
934                                struct tevent_fd *fde, uint16_t flags,
935                                void *private_data)
936 {
937         struct sockaddr_un addr;
938         socklen_t len;
939         int fd;
940         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
941         struct ctdb_client *client;
942         struct ctdb_client_pid_list *client_pid;
943         pid_t peer_pid = 0;
944         int ret;
945
946         memset(&addr, 0, sizeof(addr));
947         len = sizeof(addr);
948         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
949         if (fd == -1) {
950                 return;
951         }
952         smb_set_close_on_exec(fd);
953
954         ret = set_blocking(fd, false);
955         if (ret != 0) {
956                 DEBUG(DEBUG_ERR,
957                       (__location__
958                        " failed to set socket non-blocking (%s)\n",
959                        strerror(errno)));
960                 close(fd);
961                 return;
962         }
963
964         set_close_on_exec(fd);
965
966         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
967
968         client = talloc_zero(ctdb, struct ctdb_client);
969         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
970                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
971         }
972
973         client->ctdb = ctdb;
974         client->fd = fd;
975         client->client_id = reqid_new(ctdb->idr, client);
976         client->pid = peer_pid;
977
978         client_pid = talloc(client, struct ctdb_client_pid_list);
979         if (client_pid == NULL) {
980                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
981                 close(fd);
982                 talloc_free(client);
983                 return;
984         }               
985         client_pid->ctdb   = ctdb;
986         client_pid->pid    = peer_pid;
987         client_pid->client = client;
988
989         DLIST_ADD(ctdb->client_pids, client_pid);
990
991         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
992                                          ctdb_daemon_read_cb, client,
993                                          "client-%u", client->pid);
994
995         talloc_set_destructor(client, ctdb_client_destructor);
996         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
997         ctdb->num_clients++;
998 }
999
1000
1001
1002 /*
1003   create a unix domain socket and bind it
1004   return a file descriptor open on the socket 
1005 */
1006 static int ux_socket_bind(struct ctdb_context *ctdb)
1007 {
1008         struct sockaddr_un addr;
1009         int ret;
1010
1011         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1012         if (ctdb->daemon.sd == -1) {
1013                 return -1;
1014         }
1015
1016         memset(&addr, 0, sizeof(addr));
1017         addr.sun_family = AF_UNIX;
1018         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1019
1020         if (! sock_clean(ctdb->daemon.name)) {
1021                 return -1;
1022         }
1023
1024         set_close_on_exec(ctdb->daemon.sd);
1025
1026         ret = set_blocking(ctdb->daemon.sd, false);
1027         if (ret != 0) {
1028                 DEBUG(DEBUG_ERR,
1029                       (__location__
1030                        " failed to set socket non-blocking (%s)\n",
1031                        strerror(errno)));
1032                 goto failed;
1033         }
1034
1035         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1036                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1037                 goto failed;
1038         }
1039
1040         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1041             chmod(ctdb->daemon.name, 0700) != 0) {
1042                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1043                 goto failed;
1044         }
1045
1046
1047         if (listen(ctdb->daemon.sd, 100) != 0) {
1048                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1049                 goto failed;
1050         }
1051
1052         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1053                              ctdb->daemon.name));
1054         return 0;
1055
1056 failed:
1057         close(ctdb->daemon.sd);
1058         ctdb->daemon.sd = -1;
1059         return -1;      
1060 }
1061
1062 static void initialise_node_flags (struct ctdb_context *ctdb)
1063 {
1064         if (ctdb->pnn == -1) {
1065                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1066         }
1067
1068         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1069
1070         /* do we start out in DISABLED mode? */
1071         if (ctdb->start_as_disabled != 0) {
1072                 DEBUG(DEBUG_ERR,
1073                       ("This node is configured to start in DISABLED state\n"));
1074                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1075         }
1076         /* do we start out in STOPPED mode? */
1077         if (ctdb->start_as_stopped != 0) {
1078                 DEBUG(DEBUG_ERR,
1079                       ("This node is configured to start in STOPPED state\n"));
1080                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1081         }
1082 }
1083
1084 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1085                                       void *private_data)
1086 {
1087         if (status != 0) {
1088                 ctdb_die(ctdb, "Failed to run setup event");
1089         }
1090         ctdb_run_notification_script(ctdb, "setup");
1091
1092         /* Start the recovery daemon */
1093         if (ctdb_start_recoverd(ctdb) != 0) {
1094                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1095                 exit(11);
1096         }
1097
1098         ctdb_start_periodic_events(ctdb);
1099
1100         ctdb_wait_for_first_recovery(ctdb);
1101 }
1102
1103 static struct timeval tevent_before_wait_ts;
1104 static struct timeval tevent_after_wait_ts;
1105
1106 static void ctdb_tevent_trace_init(void)
1107 {
1108         struct timeval now;
1109
1110         now = timeval_current();
1111
1112         tevent_before_wait_ts = now;
1113         tevent_after_wait_ts = now;
1114 }
1115
1116 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1117                               void *private_data)
1118 {
1119         struct timeval diff;
1120         struct timeval now;
1121         struct ctdb_context *ctdb =
1122                 talloc_get_type(private_data, struct ctdb_context);
1123
1124         if (getpid() != ctdb->ctdbd_pid) {
1125                 return;
1126         }
1127
1128         now = timeval_current();
1129
1130         switch (tp) {
1131         case TEVENT_TRACE_BEFORE_WAIT:
1132                 diff = timeval_until(&tevent_after_wait_ts, &now);
1133                 if (diff.tv_sec > 3) {
1134                         DEBUG(DEBUG_ERR,
1135                               ("Handling event took %ld seconds!\n",
1136                                (long)diff.tv_sec));
1137                 }
1138                 tevent_before_wait_ts = now;
1139                 break;
1140
1141         case TEVENT_TRACE_AFTER_WAIT:
1142                 diff = timeval_until(&tevent_before_wait_ts, &now);
1143                 if (diff.tv_sec > 3) {
1144                         DEBUG(DEBUG_ERR,
1145                               ("No event for %ld seconds!\n",
1146                                (long)diff.tv_sec));
1147                 }
1148                 tevent_after_wait_ts = now;
1149                 break;
1150
1151         default:
1152                 /* Do nothing for future tevent trace points */ ;
1153         }
1154 }
1155
1156 static void ctdb_remove_pidfile(void)
1157 {
1158         TALLOC_FREE(ctdbd_pidfile_ctx);
1159 }
1160
1161 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1162 {
1163         if (ctdbd_pidfile != NULL) {
1164                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1165                                                  &ctdbd_pidfile_ctx);
1166                 if (ret != 0) {
1167                         DEBUG(DEBUG_ERR,
1168                               ("Failed to create PID file %s\n",
1169                                ctdbd_pidfile));
1170                         exit(11);
1171                 }
1172
1173                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1174                 atexit(ctdb_remove_pidfile);
1175         }
1176 }
1177
1178 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1179 {
1180         int i, j, count;
1181
1182         /* initialize the vnn mapping table, skipping any deleted nodes */
1183         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1184         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1185
1186         count = 0;
1187         for (i = 0; i < ctdb->num_nodes; i++) {
1188                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1189                         count++;
1190                 }
1191         }
1192
1193         ctdb->vnn_map->generation = INVALID_GENERATION;
1194         ctdb->vnn_map->size = count;
1195         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1196         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1197
1198         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1199                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1200                         continue;
1201                 }
1202                 ctdb->vnn_map->map[j] = i;
1203                 j++;
1204         }
1205 }
1206
1207 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1208 {
1209         int nodeid;
1210
1211         if (ctdb->address == NULL) {
1212                 ctdb_fatal(ctdb,
1213                            "Can not determine PNN - node address is not set\n");
1214         }
1215
1216         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1217         if (nodeid == -1) {
1218                 ctdb_fatal(ctdb,
1219                            "Can not determine PNN - node address not found in node list\n");
1220         }
1221
1222         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1223         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1224 }
1225
1226 /*
1227   start the protocol going as a daemon
1228 */
1229 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1230 {
1231         int res, ret = -1;
1232         struct tevent_fd *fde;
1233
1234         become_daemon(do_fork, false, false);
1235
1236         ignore_signal(SIGPIPE);
1237         ignore_signal(SIGUSR1);
1238
1239         ctdb->ctdbd_pid = getpid();
1240         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1241                           ctdb_version_string, ctdb->ctdbd_pid));
1242         ctdb_create_pidfile(ctdb);
1243
1244         /* create a unix domain stream socket to listen to */
1245         res = ux_socket_bind(ctdb);
1246         if (res!=0) {
1247                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1248                 exit(10);
1249         }
1250
1251         /* Make sure we log something when the daemon terminates.
1252          * This must be the first exit handler to run (so the last to
1253          * be registered.
1254          */
1255         __ctdbd_pid = getpid();
1256         atexit(print_exit_message);
1257
1258         if (ctdb->do_setsched) {
1259                 /* try to set us up as realtime */
1260                 if (!set_scheduler()) {
1261                         exit(1);
1262                 }
1263                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1264         }
1265
1266         ctdb->ev = tevent_context_init(NULL);
1267         if (ctdb->ev == NULL) {
1268                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1269                 exit(1);
1270         }
1271         tevent_loop_allow_nesting(ctdb->ev);
1272         ctdb_tevent_trace_init();
1273         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1274
1275         /* set up a handler to pick up sigchld */
1276         if (ctdb_init_sigchld(ctdb) == NULL) {
1277                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1278                 exit(1);
1279         }
1280
1281         if (do_fork) {
1282                 ctdb_set_child_logging(ctdb);
1283         }
1284
1285         TALLOC_FREE(ctdb->srv);
1286         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1287                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1288                 exit(1);
1289         }
1290
1291         TALLOC_FREE(ctdb->tunnels);
1292         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1293                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1294                 exit(1);
1295         }
1296
1297         /* initialize statistics collection */
1298         ctdb_statistics_init(ctdb);
1299
1300         /* force initial recovery for election */
1301         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1302
1303         if (ctdb_start_eventd(ctdb) != 0) {
1304                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1305                 exit(1);
1306         }
1307
1308         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1309         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1310         if (ret != 0) {
1311                 ctdb_die(ctdb, "Failed to run init event\n");
1312         }
1313         ctdb_run_notification_script(ctdb, "init");
1314
1315         if (strcmp(ctdb->transport, "tcp") == 0) {
1316                 ret = ctdb_tcp_init(ctdb);
1317         }
1318 #ifdef USE_INFINIBAND
1319         if (strcmp(ctdb->transport, "ib") == 0) {
1320                 ret = ctdb_ibw_init(ctdb);
1321         }
1322 #endif
1323         if (ret != 0) {
1324                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1325                 return -1;
1326         }
1327
1328         if (ctdb->methods == NULL) {
1329                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1330                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1331         }
1332
1333         /* Initialise the transport.  This sets the node address if it
1334          * was not set via the command-line. */
1335         if (ctdb->methods->initialise(ctdb) != 0) {
1336                 ctdb_fatal(ctdb, "transport failed to initialise");
1337         }
1338
1339         ctdb_set_my_pnn(ctdb);
1340
1341         initialise_node_flags(ctdb);
1342
1343         if (ctdb->public_addresses_file) {
1344                 ret = ctdb_set_public_addresses(ctdb, true);
1345                 if (ret == -1) {
1346                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1347                         exit(1);
1348                 }
1349         }
1350
1351         ctdb_initialise_vnn_map(ctdb);
1352
1353         /* attach to existing databases */
1354         if (ctdb_attach_databases(ctdb) != 0) {
1355                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1356         }
1357
1358         /* start frozen, then let the first election sort things out */
1359         if (!ctdb_blocking_freeze(ctdb)) {
1360                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1361         }
1362
1363         /* now start accepting clients, only can do this once frozen */
1364         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1365                             ctdb_accept_client, ctdb);
1366         if (fde == NULL) {
1367                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1368         }
1369         tevent_fd_set_auto_close(fde);
1370
1371         /* Start the transport */
1372         if (ctdb->methods->start(ctdb) != 0) {
1373                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1374                 ctdb_fatal(ctdb, "transport failed to start");
1375         }
1376
1377         /* Recovery daemon and timed events are started from the
1378          * callback, only after the setup event completes
1379          * successfully.
1380          */
1381         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1382         ret = ctdb_event_script_callback(ctdb,
1383                                          ctdb,
1384                                          ctdb_setup_event_callback,
1385                                          ctdb,
1386                                          CTDB_EVENT_SETUP,
1387                                          "%s",
1388                                          "");
1389         if (ret != 0) {
1390                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1391                 exit(1);
1392         }
1393
1394         lockdown_memory(ctdb->valgrinding);
1395
1396         /* go into a wait loop to allow other nodes to complete */
1397         tevent_loop_wait(ctdb->ev);
1398
1399         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1400         exit(1);
1401 }
1402
1403 /*
1404   allocate a packet for use in daemon<->daemon communication
1405  */
1406 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1407                                                  TALLOC_CTX *mem_ctx, 
1408                                                  enum ctdb_operation operation, 
1409                                                  size_t length, size_t slength,
1410                                                  const char *type)
1411 {
1412         int size;
1413         struct ctdb_req_header *hdr;
1414
1415         length = MAX(length, slength);
1416         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1417
1418         if (ctdb->methods == NULL) {
1419                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1420                          operation, (unsigned)length));
1421                 return NULL;
1422         }
1423
1424         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1425         if (hdr == NULL) {
1426                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1427                          operation, (unsigned)length));
1428                 return NULL;
1429         }
1430         talloc_set_name_const(hdr, type);
1431         memset(hdr, 0, slength);
1432         hdr->length       = length;
1433         hdr->operation    = operation;
1434         hdr->ctdb_magic   = CTDB_MAGIC;
1435         hdr->ctdb_version = CTDB_PROTOCOL;
1436         hdr->generation   = ctdb->vnn_map->generation;
1437         hdr->srcnode      = ctdb->pnn;
1438
1439         return hdr;     
1440 }
1441
1442 struct daemon_control_state {
1443         struct daemon_control_state *next, *prev;
1444         struct ctdb_client *client;
1445         struct ctdb_req_control_old *c;
1446         uint32_t reqid;
1447         struct ctdb_node *node;
1448 };
1449
1450 /*
1451   callback when a control reply comes in
1452  */
1453 static void daemon_control_callback(struct ctdb_context *ctdb,
1454                                     int32_t status, TDB_DATA data, 
1455                                     const char *errormsg,
1456                                     void *private_data)
1457 {
1458         struct daemon_control_state *state = talloc_get_type(private_data, 
1459                                                              struct daemon_control_state);
1460         struct ctdb_client *client = state->client;
1461         struct ctdb_reply_control_old *r;
1462         size_t len;
1463         int ret;
1464
1465         /* construct a message to send to the client containing the data */
1466         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1467         if (errormsg) {
1468                 len += strlen(errormsg);
1469         }
1470         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1471                                struct ctdb_reply_control_old);
1472         CTDB_NO_MEMORY_VOID(ctdb, r);
1473
1474         r->hdr.reqid     = state->reqid;
1475         r->status        = status;
1476         r->datalen       = data.dsize;
1477         r->errorlen = 0;
1478         memcpy(&r->data[0], data.dptr, data.dsize);
1479         if (errormsg) {
1480                 r->errorlen = strlen(errormsg);
1481                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1482         }
1483
1484         ret = daemon_queue_send(client, &r->hdr);
1485         if (ret != -1) {
1486                 talloc_free(state);
1487         }
1488 }
1489
1490 /*
1491   fail all pending controls to a disconnected node
1492  */
1493 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1494 {
1495         struct daemon_control_state *state;
1496         while ((state = node->pending_controls)) {
1497                 DLIST_REMOVE(node->pending_controls, state);
1498                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1499                                         "node is disconnected", state);
1500         }
1501 }
1502
1503 /*
1504   destroy a daemon_control_state
1505  */
1506 static int daemon_control_destructor(struct daemon_control_state *state)
1507 {
1508         if (state->node) {
1509                 DLIST_REMOVE(state->node->pending_controls, state);
1510         }
1511         return 0;
1512 }
1513
1514 /*
1515   this is called when the ctdb daemon received a ctdb request control
1516   from a local client over the unix domain socket
1517  */
1518 static void daemon_request_control_from_client(struct ctdb_client *client, 
1519                                                struct ctdb_req_control_old *c)
1520 {
1521         TDB_DATA data;
1522         int res;
1523         struct daemon_control_state *state;
1524         TALLOC_CTX *tmp_ctx = talloc_new(client);
1525
1526         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1527                 c->hdr.destnode = client->ctdb->pnn;
1528         }
1529
1530         state = talloc(client, struct daemon_control_state);
1531         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1532
1533         state->client = client;
1534         state->c = talloc_steal(state, c);
1535         state->reqid = c->hdr.reqid;
1536         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1537                 state->node = client->ctdb->nodes[c->hdr.destnode];
1538                 DLIST_ADD(state->node->pending_controls, state);
1539         } else {
1540                 state->node = NULL;
1541         }
1542
1543         talloc_set_destructor(state, daemon_control_destructor);
1544
1545         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1546                 talloc_steal(tmp_ctx, state);
1547         }
1548         
1549         data.dptr = &c->data[0];
1550         data.dsize = c->datalen;
1551         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1552                                        c->srvid, c->opcode, client->client_id,
1553                                        c->flags,
1554                                        data, daemon_control_callback,
1555                                        state);
1556         if (res != 0) {
1557                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1558                          c->hdr.destnode));
1559         }
1560
1561         talloc_free(tmp_ctx);
1562 }
1563
1564 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1565                                               struct ctdb_req_tunnel_old *c)
1566 {
1567         TDB_DATA data;
1568         int ret;
1569
1570         if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1571                 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1572                                   c->hdr.destnode));
1573                 return;
1574         }
1575
1576         ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1577         if (ret != 0) {
1578                 DEBUG(DEBUG_ERR,
1579                       ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1580                        c->tunnel_id));
1581                 return;
1582         }
1583
1584         data = (TDB_DATA) {
1585                 .dsize = c->datalen,
1586                 .dptr = &c->data[0],
1587         };
1588
1589         ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1590                                       c->tunnel_id, c->flags, data);
1591         if (ret != 0) {
1592                 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1593                                   c->hdr.destnode));
1594         }
1595 }
1596
1597 /*
1598   register a call function
1599 */
1600 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1601                          ctdb_fn_t fn, int id)
1602 {
1603         struct ctdb_registered_call *call;
1604         struct ctdb_db_context *ctdb_db;
1605
1606         ctdb_db = find_ctdb_db(ctdb, db_id);
1607         if (ctdb_db == NULL) {
1608                 return -1;
1609         }
1610
1611         call = talloc(ctdb_db, struct ctdb_registered_call);
1612         call->fn = fn;
1613         call->id = id;
1614
1615         DLIST_ADD(ctdb_db->calls, call);        
1616         return 0;
1617 }
1618
1619
1620
1621 /*
1622   this local messaging handler is ugly, but is needed to prevent
1623   recursion in ctdb_send_message() when the destination node is the
1624   same as the source node
1625  */
1626 struct ctdb_local_message {
1627         struct ctdb_context *ctdb;
1628         uint64_t srvid;
1629         TDB_DATA data;
1630 };
1631
1632 static void ctdb_local_message_trigger(struct tevent_context *ev,
1633                                        struct tevent_timer *te,
1634                                        struct timeval t, void *private_data)
1635 {
1636         struct ctdb_local_message *m = talloc_get_type(
1637                 private_data, struct ctdb_local_message);
1638
1639         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1640         talloc_free(m);
1641 }
1642
1643 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1644 {
1645         struct ctdb_local_message *m;
1646         m = talloc(ctdb, struct ctdb_local_message);
1647         CTDB_NO_MEMORY(ctdb, m);
1648
1649         m->ctdb = ctdb;
1650         m->srvid = srvid;
1651         m->data  = data;
1652         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1653         if (m->data.dptr == NULL) {
1654                 talloc_free(m);
1655                 return -1;
1656         }
1657
1658         /* this needs to be done as an event to prevent recursion */
1659         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1660                          ctdb_local_message_trigger, m);
1661         return 0;
1662 }
1663
1664 /*
1665   send a ctdb message
1666 */
1667 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1668                              uint64_t srvid, TDB_DATA data)
1669 {
1670         struct ctdb_req_message_old *r;
1671         int len;
1672
1673         if (ctdb->methods == NULL) {
1674                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1675                 return -1;
1676         }
1677
1678         /* see if this is a message to ourselves */
1679         if (pnn == ctdb->pnn) {
1680                 return ctdb_local_message(ctdb, srvid, data);
1681         }
1682
1683         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1684         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1685                                     struct ctdb_req_message_old);
1686         CTDB_NO_MEMORY(ctdb, r);
1687
1688         r->hdr.destnode  = pnn;
1689         r->srvid         = srvid;
1690         r->datalen       = data.dsize;
1691         memcpy(&r->data[0], data.dptr, data.dsize);
1692
1693         ctdb_queue_packet(ctdb, &r->hdr);
1694
1695         talloc_free(r);
1696         return 0;
1697 }
1698
1699
1700
1701 struct ctdb_client_notify_list {
1702         struct ctdb_client_notify_list *next, *prev;
1703         struct ctdb_context *ctdb;
1704         uint64_t srvid;
1705         TDB_DATA data;
1706 };
1707
1708
1709 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1710 {
1711         int ret;
1712
1713         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1714
1715         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1716         if (ret != 0) {
1717                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1718         }
1719
1720         return 0;
1721 }
1722
1723 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1724 {
1725         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1726         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1727         struct ctdb_client_notify_list *nl;
1728
1729         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1730
1731         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1732                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1733                 return -1;
1734         }
1735
1736         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1737                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1738                 return -1;
1739         }
1740
1741
1742         if (client == NULL) {
1743                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1744                 return -1;
1745         }
1746
1747         for(nl=client->notify; nl; nl=nl->next) {
1748                 if (nl->srvid == notify->srvid) {
1749                         break;
1750                 }
1751         }
1752         if (nl != NULL) {
1753                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1754                 return -1;
1755         }
1756
1757         nl = talloc(client, struct ctdb_client_notify_list);
1758         CTDB_NO_MEMORY(ctdb, nl);
1759         nl->ctdb       = ctdb;
1760         nl->srvid      = notify->srvid;
1761         nl->data.dsize = notify->len;
1762         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1763                                        nl->data.dsize);
1764         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1765         
1766         DLIST_ADD(client->notify, nl);
1767         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1768
1769         return 0;
1770 }
1771
1772 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1773 {
1774         uint64_t srvid = *(uint64_t *)indata.dptr;
1775         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1776         struct ctdb_client_notify_list *nl;
1777
1778         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1779
1780         if (client == NULL) {
1781                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1782                 return -1;
1783         }
1784
1785         for(nl=client->notify; nl; nl=nl->next) {
1786                 if (nl->srvid == srvid) {
1787                         break;
1788                 }
1789         }
1790         if (nl == NULL) {
1791                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1792                 return -1;
1793         }
1794
1795         DLIST_REMOVE(client->notify, nl);
1796         talloc_set_destructor(nl, NULL);
1797         talloc_free(nl);
1798
1799         return 0;
1800 }
1801
1802 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1803 {
1804         struct ctdb_client_pid_list *client_pid;
1805
1806         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1807                 if (client_pid->pid == pid) {
1808                         return client_pid->client;
1809                 }
1810         }
1811         return NULL;
1812 }
1813
1814
1815 /* This control is used by samba when probing if a process (of a samba daemon)
1816    exists on the node.
1817    Samba does this when it needs/wants to check if a subrecord in one of the
1818    databases is still valid, or if it is stale and can be removed.
1819    If the node is in unhealthy or stopped state we just kill of the samba
1820    process holding this sub-record and return to the calling samba that
1821    the process does not exist.
1822    This allows us to forcefully recall subrecords registered by samba processes
1823    on banned and stopped nodes.
1824 */
1825 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1826 {
1827         struct ctdb_client *client;
1828
1829         client = ctdb_find_client_by_pid(ctdb, pid);
1830         if (client == NULL) {
1831                 return -1;
1832         }
1833
1834         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1835                 DEBUG(DEBUG_NOTICE,
1836                       ("Killing client with pid:%d on banned/stopped node\n",
1837                        (int)pid));
1838                 talloc_free(client);
1839                 return -1;
1840         }
1841
1842         return kill(pid, 0);
1843 }
1844
1845 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1846                                      TDB_DATA indata)
1847 {
1848         struct ctdb_client_pid_list *client_pid;
1849         pid_t pid;
1850         uint64_t srvid;
1851         int ret;
1852
1853         pid = *(pid_t *)indata.dptr;
1854         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1855
1856         for (client_pid = ctdb->client_pids;
1857              client_pid != NULL;
1858              client_pid = client_pid->next) {
1859                 if (client_pid->pid == pid) {
1860                         ret = srvid_exists(ctdb->srv, srvid,
1861                                            client_pid->client);
1862                         if (ret == 0) {
1863                                 return 0;
1864                         }
1865                 }
1866         }
1867
1868         return -1;
1869 }
1870
1871 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1872 {
1873         struct ctdb_node_map_old *node_map = NULL;
1874
1875         CHECK_CONTROL_DATA_SIZE(0);
1876
1877         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1878         if (node_map == NULL) {
1879                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1880                 return -1;
1881         }
1882
1883         outdata->dptr  = (unsigned char *)node_map;
1884         outdata->dsize = talloc_get_size(outdata->dptr);
1885
1886         return 0;
1887 }
1888
1889 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1890 {
1891         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1892                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1893                 return;
1894         }
1895
1896         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1897         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1898         ctdb_stop_recoverd(ctdb);
1899         ctdb_stop_keepalive(ctdb);
1900         ctdb_stop_monitoring(ctdb);
1901         ctdb_release_all_ips(ctdb);
1902         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1903         ctdb_stop_eventd(ctdb);
1904         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1905                 ctdb->methods->shutdown(ctdb);
1906         }
1907
1908         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1909         exit(exit_code);
1910 }
1911
1912 /* When forking the main daemon and the child process needs to connect
1913  * back to the daemon as a client process, this function can be used
1914  * to change the ctdb context from daemon into client mode.  The child
1915  * process must be created using ctdb_fork() and not fork() -
1916  * ctdb_fork() does some necessary housekeeping.
1917  */
1918 int switch_from_server_to_client(struct ctdb_context *ctdb)
1919 {
1920         int ret;
1921
1922         /* get a new event context */
1923         ctdb->ev = tevent_context_init(ctdb);
1924         if (ctdb->ev == NULL) {
1925                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1926                 exit(1);
1927         }
1928         tevent_loop_allow_nesting(ctdb->ev);
1929
1930         /* Connect to main CTDB daemon */
1931         ret = ctdb_socket_connect(ctdb);
1932         if (ret != 0) {
1933                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1934                 return -1;
1935         }
1936
1937         ctdb->can_send_controls = true;
1938
1939         return 0;
1940 }