ctdb-daemon: Add code to process ctdb_req_tunnel packets
[metze/samba/wip.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213                            void *private_data)
214 {
215         struct ctdb_client *client =
216                 talloc_get_type_abort(private_data, struct ctdb_client);
217         struct ctdb_req_tunnel_old *c, *pkt;
218         size_t len;
219
220         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
221
222         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224                                len, struct ctdb_req_tunnel_old);
225         if (c == NULL) {
226                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227                 return;
228         }
229
230         talloc_set_name_const(c, "req_tunnel packet");
231
232         c->tunnel_id = tunnel_id;
233         c->flags = pkt->flags;
234         c->datalen = pkt->datalen;
235         memcpy(c->data, pkt->data, pkt->datalen);
236
237         daemon_queue_send(client, &c->hdr);
238
239         talloc_free(c);
240 }
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         struct ctdb_db_context *ctdb_db;
248
249         ctdb_takeover_client_destructor_hook(client);
250         reqid_remove(client->ctdb->idr, client->client_id);
251         client->ctdb->num_clients--;
252
253         if (client->num_persistent_updates != 0) {
254                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256         }
257         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258         if (ctdb_db) {
259                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260                                   "commit active. Forcing recovery.\n"));
261                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
262
263                 /*
264                  * trans3 transaction state:
265                  *
266                  * The destructor sets the pointer to NULL.
267                  */
268                 talloc_free(ctdb_db->persistent_state);
269         }
270
271         return 0;
272 }
273
274
275 /*
276   this is called when the ctdb daemon received a ctdb request message
277   from a local client over the unix domain socket
278  */
279 static void daemon_request_message_from_client(struct ctdb_client *client, 
280                                                struct ctdb_req_message_old *c)
281 {
282         TDB_DATA data;
283         int res;
284
285         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
287         }
288
289         /* maybe the message is for another client on this node */
290         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292                 return;
293         }
294
295         /* its for a remote node */
296         data.dptr = &c->data[0];
297         data.dsize = c->datalen;
298         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299                                        c->srvid, data);
300         if (res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302                          c->hdr.destnode));
303         }
304 }
305
306
307 struct daemon_call_state {
308         struct ctdb_client *client;
309         uint32_t reqid;
310         struct ctdb_call *call;
311         struct timeval start_time;
312
313         /* readonly request ? */
314         uint32_t readonly_fetch;
315         uint32_t client_callid;
316 };
317
318 /* 
319    complete a call from a client 
320 */
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 {
323         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
324                                                            struct daemon_call_state);
325         struct ctdb_reply_call_old *r;
326         int res;
327         uint32_t length;
328         struct ctdb_client *client = dstate->client;
329         struct ctdb_db_context *ctdb_db = state->ctdb_db;
330
331         talloc_steal(client, dstate);
332         talloc_steal(dstate, dstate->call);
333
334         res = ctdb_daemon_call_recv(state, dstate->call);
335         if (res != 0) {
336                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338
339                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340                 return;
341         }
342
343         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344         /* If the client asked for readonly FETCH, we remapped this to 
345            FETCH_WITH_HEADER when calling the daemon. So we must
346            strip the extra header off the reply data before passing
347            it back to the client.
348         */
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 length -= sizeof(struct ctdb_ltdb_header);
352         }
353
354         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
355                                length, struct ctdb_reply_call_old);
356         if (r == NULL) {
357                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360                 return;
361         }
362         r->hdr.reqid        = dstate->reqid;
363         r->status           = dstate->call->status;
364
365         if (dstate->readonly_fetch
366         && dstate->client_callid == CTDB_FETCH_FUNC) {
367                 /* client only asked for a FETCH so we must strip off
368                    the extra ctdb_ltdb header
369                 */
370                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372         } else {
373                 r->datalen          = dstate->call->reply_data.dsize;
374                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
375         }
376
377         res = daemon_queue_send(client, &r->hdr);
378         if (res == -1) {
379                 /* client is dead - return immediately */
380                 return;
381         }
382         if (res != 0) {
383                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384         }
385         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387         talloc_free(dstate);
388 }
389
390 struct ctdb_daemon_packet_wrap {
391         struct ctdb_context *ctdb;
392         uint32_t client_id;
393 };
394
395 /*
396   a wrapper to catch disconnected clients
397  */
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 {
400         struct ctdb_client *client;
401         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
402                                                             struct ctdb_daemon_packet_wrap);
403         if (w == NULL) {
404                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405                 return;
406         }
407
408         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409         if (client == NULL) {
410                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411                          w->client_id));
412                 talloc_free(w);
413                 return;
414         }
415         talloc_free(w);
416
417         /* process it */
418         daemon_incoming_packet(client, hdr);    
419 }
420
421 struct ctdb_deferred_fetch_call {
422         struct ctdb_deferred_fetch_call *next, *prev;
423         struct ctdb_req_call_old *c;
424         struct ctdb_daemon_packet_wrap *w;
425 };
426
427 struct ctdb_deferred_fetch_queue {
428         struct ctdb_deferred_fetch_call *deferred_calls;
429 };
430
431 struct ctdb_deferred_requeue {
432         struct ctdb_deferred_fetch_call *dfc;
433         struct ctdb_client *client;
434 };
435
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438                                     struct tevent_timer *te,
439                                     struct timeval t, void *private_data)
440 {
441         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442         struct ctdb_client *client = dfr->client;
443
444         talloc_steal(client, dfr->dfc->c);
445         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446         talloc_free(dfr);
447 }
448
449 /* the referral context is destroyed either after a timeout or when the initial
450    fetch-lock has finished.
451    at this stage, immediately start reprocessing the queued up deferred
452    calls so they get reprocessed immediately (and since we are dmaster at
453    this stage, trigger the waiting smbd processes to pick up and aquire the
454    record right away.
455 */
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
457 {
458
459         /* need to reprocess the packets from the queue explicitely instead of
460            just using a normal destructor since we want, need, to
461            call the clients in the same oder as the requests queued up
462         */
463         while (dfq->deferred_calls != NULL) {
464                 struct ctdb_client *client;
465                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466                 struct ctdb_deferred_requeue *dfr;
467
468                 DLIST_REMOVE(dfq->deferred_calls, dfc);
469
470                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471                 if (client == NULL) {
472                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473                                  dfc->w->client_id));
474                         continue;
475                 }
476
477                 /* process it by pushing it back onto the eventloop */
478                 dfr = talloc(client, struct ctdb_deferred_requeue);
479                 if (dfr == NULL) {
480                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481                         continue;
482                 }
483
484                 dfr->dfc    = talloc_steal(dfr, dfc);
485                 dfr->client = client;
486
487                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488                                  reprocess_deferred_call, dfr);
489         }
490
491         return 0;
492 }
493
494 /* insert the new deferral context into the rb tree.
495    there should never be a pre-existing context here, but check for it
496    warn and destroy the previous context if there is already a deferral context
497    for this key.
498 */
499 static void *insert_dfq_callback(void *parm, void *data)
500 {
501         if (data) {
502                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503                 talloc_free(data);
504         }
505         return parm;
506 }
507
508 /* if the original fetch-lock did not complete within a reasonable time,
509    free the context and context for all deferred requests to cause them to be
510    re-inserted into the event system.
511 */
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513                         struct timeval t, void *private_data)
514 {
515         talloc_free(private_data);
516 }
517
518 /* This function is used in the local daemon to register a KEY in a database
519    for being "fetched"
520    While the remote fetch is in-flight, any futher attempts to re-fetch the
521    same record will be deferred until the fetch completes.
522 */
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 {
525         uint32_t *k;
526         struct ctdb_deferred_fetch_queue *dfq;
527
528         k = ctdb_key_to_idkey(call, call->key);
529         if (k == NULL) {
530                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531                 return -1;
532         }
533
534         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
535         if (dfq == NULL) {
536                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537                 talloc_free(k);
538                 return -1;
539         }
540         dfq->deferred_calls = NULL;
541
542         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543
544         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545
546         /* if the fetch havent completed in 30 seconds, just tear it all down
547            and let it try again as the events are reissued */
548         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549                          dfq_timeout, dfq);
550
551         talloc_free(k);
552         return 0;
553 }
554
555 /* check if this is a duplicate request to a fetch already in-flight
556    if it is, make this call deferred to be reprocessed later when
557    the in-flight fetch completes.
558 */
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 {
561         uint32_t *k;
562         struct ctdb_deferred_fetch_queue *dfq;
563         struct ctdb_deferred_fetch_call *dfc;
564
565         k = ctdb_key_to_idkey(c, key);
566         if (k == NULL) {
567                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568                 return -1;
569         }
570
571         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572         if (dfq == NULL) {
573                 talloc_free(k);
574                 return -1;
575         }
576
577
578         talloc_free(k);
579
580         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581         if (dfc == NULL) {
582                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583                 return -1;
584         }
585
586         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587         if (dfc->w == NULL) {
588                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589                 talloc_free(dfc);
590                 return -1;
591         }
592
593         dfc->c = talloc_steal(dfc, c);
594         dfc->w->ctdb = ctdb_db->ctdb;
595         dfc->w->client_id = client->client_id;
596
597         DLIST_ADD_END(dfq->deferred_calls, dfc);
598
599         return 0;
600 }
601
602
603 /*
604   this is called when the ctdb daemon received a ctdb request call
605   from a local client over the unix domain socket
606  */
607 static void daemon_request_call_from_client(struct ctdb_client *client, 
608                                             struct ctdb_req_call_old *c)
609 {
610         struct ctdb_call_state *state;
611         struct ctdb_db_context *ctdb_db;
612         struct daemon_call_state *dstate;
613         struct ctdb_call *call;
614         struct ctdb_ltdb_header header;
615         TDB_DATA key, data;
616         int ret;
617         struct ctdb_context *ctdb = client->ctdb;
618         struct ctdb_daemon_packet_wrap *w;
619
620         CTDB_INCREMENT_STAT(ctdb, total_calls);
621         CTDB_INCREMENT_STAT(ctdb, pending_calls);
622
623         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624         if (!ctdb_db) {
625                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626                           c->db_id));
627                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628                 return;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 /*
633                  * this is just a warning, as the tdb should be empty anyway,
634                  * and only persistent databases can be unhealthy, which doesn't
635                  * use this code patch
636                  */
637                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
639         }
640
641         key.dptr = c->data;
642         key.dsize = c->keylen;
643
644         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645         CTDB_NO_MEMORY_VOID(ctdb, w);   
646
647         w->ctdb = ctdb;
648         w->client_id = client->client_id;
649
650         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
651                                            (struct ctdb_req_header *)c, &data,
652                                            daemon_incoming_packet_wrap, w, true);
653         if (ret == -2) {
654                 /* will retry later */
655                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656                 return;
657         }
658
659         talloc_free(w);
660
661         if (ret != 0) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667
668         /* check if this fetch request is a duplicate for a
669            request we already have in flight. If so defer it until
670            the first request completes.
671         */
672         if (ctdb->tunable.fetch_collapse == 1) {
673                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674                         ret = ctdb_ltdb_unlock(ctdb_db, key);
675                         if (ret != 0) {
676                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677                         }
678                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
679                         talloc_free(data.dptr);
680                         return;
681                 }
682         }
683
684         /* Dont do READONLY if we don't have a tracking database */
685         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686                 c->flags &= ~CTDB_WANT_READONLY;
687         }
688
689         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690                 header.flags &= ~CTDB_REC_RO_FLAGS;
691                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695                 }
696                 /* and clear out the tracking data */
697                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
699                 }
700         }
701
702         /* if we are revoking, we must defer all other calls until the revoke
703          * had completed.
704          */
705         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706                 talloc_free(data.dptr);
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711                 }
712                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713                 return;
714         }
715
716         if ((header.dmaster == ctdb->pnn)
717         && (!(c->flags & CTDB_WANT_READONLY))
718         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
720                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722                 }
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724
725                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to start record revoke");
727                 }
728                 talloc_free(data.dptr);
729
730                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732                 }
733
734                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735                 return;
736         }               
737
738         dstate = talloc(client, struct daemon_call_state);
739         if (dstate == NULL) {
740                 ret = ctdb_ltdb_unlock(ctdb_db, key);
741                 if (ret != 0) {
742                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743                 }
744
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747                 return;
748         }
749         dstate->start_time = timeval_current();
750         dstate->client = client;
751         dstate->reqid  = c->hdr.reqid;
752         talloc_steal(dstate, data.dptr);
753
754         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755         if (call == NULL) {
756                 ret = ctdb_ltdb_unlock(ctdb_db, key);
757                 if (ret != 0) {
758                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759                 }
760
761                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764                 return;
765         }
766
767         dstate->readonly_fetch = 0;
768         call->call_id = c->callid;
769         call->key = key;
770         call->call_data.dptr = c->data + c->keylen;
771         call->call_data.dsize = c->calldatalen;
772         call->flags = c->flags;
773
774         if (c->flags & CTDB_WANT_READONLY) {
775                 /* client wants readonly record, so translate this into a 
776                    fetch with header. remember what the client asked for
777                    so we can remap the reply back to the proper format for
778                    the client in the reply
779                  */
780                 dstate->client_callid = call->call_id;
781                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782                 dstate->readonly_fetch = 1;
783         }
784
785         if (header.dmaster == ctdb->pnn) {
786                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787         } else {
788                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789                 if (ctdb->tunable.fetch_collapse == 1) {
790                         /* This request triggered a remote fetch-lock.
791                            set up a deferral for this key so any additional
792                            fetch-locks are deferred until the current one
793                            finishes.
794                          */
795                         setup_deferred_fetch_locks(ctdb_db, call);
796                 }
797         }
798
799         ret = ctdb_ltdb_unlock(ctdb_db, key);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802         }
803
804         if (state == NULL) {
805                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808                 return;
809         }
810         talloc_steal(state, dstate);
811         talloc_steal(client, state);
812
813         state->async.fn = daemon_call_from_client_callback;
814         state->async.private_data = dstate;
815 }
816
817
818 static void daemon_request_control_from_client(struct ctdb_client *client, 
819                                                struct ctdb_req_control_old *c);
820 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
821                                               struct ctdb_req_tunnel_old *c);
822
823 /* data contains a packet from the client */
824 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
825 {
826         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
827         TALLOC_CTX *tmp_ctx;
828         struct ctdb_context *ctdb = client->ctdb;
829
830         /* place the packet as a child of a tmp_ctx. We then use
831            talloc_free() below to free it. If any of the calls want
832            to keep it, then they will steal it somewhere else, and the
833            talloc_free() will be a no-op */
834         tmp_ctx = talloc_new(client);
835         talloc_steal(tmp_ctx, hdr);
836
837         if (hdr->ctdb_magic != CTDB_MAGIC) {
838                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
839                 goto done;
840         }
841
842         if (hdr->ctdb_version != CTDB_PROTOCOL) {
843                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
844                 goto done;
845         }
846
847         switch (hdr->operation) {
848         case CTDB_REQ_CALL:
849                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
850                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851                 break;
852
853         case CTDB_REQ_MESSAGE:
854                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
855                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856                 break;
857
858         case CTDB_REQ_CONTROL:
859                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
860                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
861                 break;
862
863         case CTDB_REQ_TUNNEL:
864                 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
865                 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
866                 break;
867
868         default:
869                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
870                          hdr->operation));
871         }
872
873 done:
874         talloc_free(tmp_ctx);
875 }
876
877 /*
878   called when the daemon gets a incoming packet
879  */
880 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
881 {
882         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
883         struct ctdb_req_header *hdr;
884
885         if (cnt == 0) {
886                 talloc_free(client);
887                 return;
888         }
889
890         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
891
892         if (cnt < sizeof(*hdr)) {
893                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
894                                (unsigned)cnt);
895                 return;
896         }
897         hdr = (struct ctdb_req_header *)data;
898         if (cnt != hdr->length) {
899                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
900                                (unsigned)hdr->length, (unsigned)cnt);
901                 return;
902         }
903
904         if (hdr->ctdb_magic != CTDB_MAGIC) {
905                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
906                 return;
907         }
908
909         if (hdr->ctdb_version != CTDB_PROTOCOL) {
910                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
911                 return;
912         }
913
914         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
915                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
916                  hdr->srcnode, hdr->destnode));
917
918         /* it is the responsibility of the incoming packet function to free 'data' */
919         daemon_incoming_packet(client, hdr);
920 }
921
922
923 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
924 {
925         if (client_pid->ctdb->client_pids != NULL) {
926                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
927         }
928
929         return 0;
930 }
931
932
933 static void ctdb_accept_client(struct tevent_context *ev,
934                                struct tevent_fd *fde, uint16_t flags,
935                                void *private_data)
936 {
937         struct sockaddr_un addr;
938         socklen_t len;
939         int fd;
940         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
941         struct ctdb_client *client;
942         struct ctdb_client_pid_list *client_pid;
943         pid_t peer_pid = 0;
944         int ret;
945
946         memset(&addr, 0, sizeof(addr));
947         len = sizeof(addr);
948         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
949         if (fd == -1) {
950                 return;
951         }
952
953         ret = set_blocking(fd, false);
954         if (ret != 0) {
955                 DEBUG(DEBUG_ERR,
956                       (__location__
957                        " failed to set socket non-blocking (%s)\n",
958                        strerror(errno)));
959                 close(fd);
960                 return;
961         }
962
963         set_close_on_exec(fd);
964
965         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
966
967         client = talloc_zero(ctdb, struct ctdb_client);
968         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
969                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
970         }
971
972         client->ctdb = ctdb;
973         client->fd = fd;
974         client->client_id = reqid_new(ctdb->idr, client);
975         client->pid = peer_pid;
976
977         client_pid = talloc(client, struct ctdb_client_pid_list);
978         if (client_pid == NULL) {
979                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
980                 close(fd);
981                 talloc_free(client);
982                 return;
983         }               
984         client_pid->ctdb   = ctdb;
985         client_pid->pid    = peer_pid;
986         client_pid->client = client;
987
988         DLIST_ADD(ctdb->client_pids, client_pid);
989
990         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
991                                          ctdb_daemon_read_cb, client,
992                                          "client-%u", client->pid);
993
994         talloc_set_destructor(client, ctdb_client_destructor);
995         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
996         ctdb->num_clients++;
997 }
998
999
1000
1001 /*
1002   create a unix domain socket and bind it
1003   return a file descriptor open on the socket 
1004 */
1005 static int ux_socket_bind(struct ctdb_context *ctdb)
1006 {
1007         struct sockaddr_un addr;
1008         int ret;
1009
1010         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1011         if (ctdb->daemon.sd == -1) {
1012                 return -1;
1013         }
1014
1015         memset(&addr, 0, sizeof(addr));
1016         addr.sun_family = AF_UNIX;
1017         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1018
1019         if (! sock_clean(ctdb->daemon.name)) {
1020                 return -1;
1021         }
1022
1023         set_close_on_exec(ctdb->daemon.sd);
1024
1025         ret = set_blocking(ctdb->daemon.sd, false);
1026         if (ret != 0) {
1027                 DEBUG(DEBUG_ERR,
1028                       (__location__
1029                        " failed to set socket non-blocking (%s)\n",
1030                        strerror(errno)));
1031                 goto failed;
1032         }
1033
1034         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1035                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1036                 goto failed;
1037         }
1038
1039         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1040             chmod(ctdb->daemon.name, 0700) != 0) {
1041                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1042                 goto failed;
1043         }
1044
1045
1046         if (listen(ctdb->daemon.sd, 100) != 0) {
1047                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1048                 goto failed;
1049         }
1050
1051         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1052                              ctdb->daemon.name));
1053         return 0;
1054
1055 failed:
1056         close(ctdb->daemon.sd);
1057         ctdb->daemon.sd = -1;
1058         return -1;      
1059 }
1060
1061 static void initialise_node_flags (struct ctdb_context *ctdb)
1062 {
1063         if (ctdb->pnn == -1) {
1064                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1065         }
1066
1067         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1068
1069         /* do we start out in DISABLED mode? */
1070         if (ctdb->start_as_disabled != 0) {
1071                 DEBUG(DEBUG_ERR,
1072                       ("This node is configured to start in DISABLED state\n"));
1073                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1074         }
1075         /* do we start out in STOPPED mode? */
1076         if (ctdb->start_as_stopped != 0) {
1077                 DEBUG(DEBUG_ERR,
1078                       ("This node is configured to start in STOPPED state\n"));
1079                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1080         }
1081 }
1082
1083 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1084                                       void *private_data)
1085 {
1086         if (status != 0) {
1087                 ctdb_die(ctdb, "Failed to run setup event");
1088         }
1089         ctdb_run_notification_script(ctdb, "setup");
1090
1091         /* tell all other nodes we've just started up */
1092         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1093                                  0, CTDB_CONTROL_STARTUP, 0,
1094                                  CTDB_CTRL_FLAG_NOREPLY,
1095                                  tdb_null, NULL, NULL);
1096
1097         /* Start the recovery daemon */
1098         if (ctdb_start_recoverd(ctdb) != 0) {
1099                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1100                 exit(11);
1101         }
1102
1103         ctdb_start_periodic_events(ctdb);
1104
1105         ctdb_wait_for_first_recovery(ctdb);
1106 }
1107
1108 static struct timeval tevent_before_wait_ts;
1109 static struct timeval tevent_after_wait_ts;
1110
1111 static void ctdb_tevent_trace_init(void)
1112 {
1113         struct timeval now;
1114
1115         now = timeval_current();
1116
1117         tevent_before_wait_ts = now;
1118         tevent_after_wait_ts = now;
1119 }
1120
1121 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1122                               void *private_data)
1123 {
1124         struct timeval diff;
1125         struct timeval now;
1126         struct ctdb_context *ctdb =
1127                 talloc_get_type(private_data, struct ctdb_context);
1128
1129         if (getpid() != ctdb->ctdbd_pid) {
1130                 return;
1131         }
1132
1133         now = timeval_current();
1134
1135         switch (tp) {
1136         case TEVENT_TRACE_BEFORE_WAIT:
1137                 diff = timeval_until(&tevent_after_wait_ts, &now);
1138                 if (diff.tv_sec > 3) {
1139                         DEBUG(DEBUG_ERR,
1140                               ("Handling event took %ld seconds!\n",
1141                                (long)diff.tv_sec));
1142                 }
1143                 tevent_before_wait_ts = now;
1144                 break;
1145
1146         case TEVENT_TRACE_AFTER_WAIT:
1147                 diff = timeval_until(&tevent_before_wait_ts, &now);
1148                 if (diff.tv_sec > 3) {
1149                         DEBUG(DEBUG_ERR,
1150                               ("No event for %ld seconds!\n",
1151                                (long)diff.tv_sec));
1152                 }
1153                 tevent_after_wait_ts = now;
1154                 break;
1155
1156         default:
1157                 /* Do nothing for future tevent trace points */ ;
1158         }
1159 }
1160
1161 static void ctdb_remove_pidfile(void)
1162 {
1163         TALLOC_FREE(ctdbd_pidfile_ctx);
1164 }
1165
1166 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1167 {
1168         if (ctdbd_pidfile != NULL) {
1169                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1170                                                  &ctdbd_pidfile_ctx);
1171                 if (ret != 0) {
1172                         DEBUG(DEBUG_ERR,
1173                               ("Failed to create PID file %s\n",
1174                                ctdbd_pidfile));
1175                         exit(11);
1176                 }
1177
1178                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1179                 atexit(ctdb_remove_pidfile);
1180         }
1181 }
1182
1183 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1184 {
1185         int i, j, count;
1186
1187         /* initialize the vnn mapping table, skipping any deleted nodes */
1188         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1189         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1190
1191         count = 0;
1192         for (i = 0; i < ctdb->num_nodes; i++) {
1193                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1194                         count++;
1195                 }
1196         }
1197
1198         ctdb->vnn_map->generation = INVALID_GENERATION;
1199         ctdb->vnn_map->size = count;
1200         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1201         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1202
1203         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1204                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1205                         continue;
1206                 }
1207                 ctdb->vnn_map->map[j] = i;
1208                 j++;
1209         }
1210 }
1211
1212 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1213 {
1214         int nodeid;
1215
1216         if (ctdb->address == NULL) {
1217                 ctdb_fatal(ctdb,
1218                            "Can not determine PNN - node address is not set\n");
1219         }
1220
1221         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1222         if (nodeid == -1) {
1223                 ctdb_fatal(ctdb,
1224                            "Can not determine PNN - node address not found in node list\n");
1225         }
1226
1227         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1228         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1229 }
1230
1231 /*
1232   start the protocol going as a daemon
1233 */
1234 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1235 {
1236         int res, ret = -1;
1237         struct tevent_fd *fde;
1238
1239         become_daemon(do_fork, false, false);
1240
1241         ignore_signal(SIGPIPE);
1242         ignore_signal(SIGUSR1);
1243
1244         ctdb->ctdbd_pid = getpid();
1245         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1246                           ctdb_version_string, ctdb->ctdbd_pid));
1247         ctdb_create_pidfile(ctdb);
1248
1249         /* create a unix domain stream socket to listen to */
1250         res = ux_socket_bind(ctdb);
1251         if (res!=0) {
1252                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1253                 exit(10);
1254         }
1255
1256         /* Make sure we log something when the daemon terminates.
1257          * This must be the first exit handler to run (so the last to
1258          * be registered.
1259          */
1260         __ctdbd_pid = getpid();
1261         atexit(print_exit_message);
1262
1263         if (ctdb->do_setsched) {
1264                 /* try to set us up as realtime */
1265                 if (!set_scheduler()) {
1266                         exit(1);
1267                 }
1268                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1269         }
1270
1271         ctdb->ev = tevent_context_init(NULL);
1272         if (ctdb->ev == NULL) {
1273                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1274                 exit(1);
1275         }
1276         tevent_loop_allow_nesting(ctdb->ev);
1277         ctdb_tevent_trace_init();
1278         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1279
1280         /* set up a handler to pick up sigchld */
1281         if (ctdb_init_sigchld(ctdb) == NULL) {
1282                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1283                 exit(1);
1284         }
1285
1286         if (do_fork) {
1287                 ctdb_set_child_logging(ctdb);
1288         }
1289
1290         TALLOC_FREE(ctdb->srv);
1291         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1292                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1293                 exit(1);
1294         }
1295
1296         TALLOC_FREE(ctdb->tunnels);
1297         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1298                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1299                 exit(1);
1300         }
1301
1302         /* initialize statistics collection */
1303         ctdb_statistics_init(ctdb);
1304
1305         /* force initial recovery for election */
1306         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1307
1308         if (ctdb_start_eventd(ctdb) != 0) {
1309                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1310                 exit(1);
1311         }
1312
1313         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1314         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1315         if (ret != 0) {
1316                 ctdb_die(ctdb, "Failed to run init event\n");
1317         }
1318         ctdb_run_notification_script(ctdb, "init");
1319
1320         if (strcmp(ctdb->transport, "tcp") == 0) {
1321                 ret = ctdb_tcp_init(ctdb);
1322         }
1323 #ifdef USE_INFINIBAND
1324         if (strcmp(ctdb->transport, "ib") == 0) {
1325                 ret = ctdb_ibw_init(ctdb);
1326         }
1327 #endif
1328         if (ret != 0) {
1329                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1330                 return -1;
1331         }
1332
1333         if (ctdb->methods == NULL) {
1334                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1335                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1336         }
1337
1338         /* Initialise the transport.  This sets the node address if it
1339          * was not set via the command-line. */
1340         if (ctdb->methods->initialise(ctdb) != 0) {
1341                 ctdb_fatal(ctdb, "transport failed to initialise");
1342         }
1343
1344         ctdb_set_my_pnn(ctdb);
1345
1346         initialise_node_flags(ctdb);
1347
1348         if (ctdb->public_addresses_file) {
1349                 ret = ctdb_set_public_addresses(ctdb, true);
1350                 if (ret == -1) {
1351                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1352                         exit(1);
1353                 }
1354         }
1355
1356         ctdb_initialise_vnn_map(ctdb);
1357
1358         /* attach to existing databases */
1359         if (ctdb_attach_databases(ctdb) != 0) {
1360                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1361         }
1362
1363         /* start frozen, then let the first election sort things out */
1364         if (!ctdb_blocking_freeze(ctdb)) {
1365                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1366         }
1367
1368         /* now start accepting clients, only can do this once frozen */
1369         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1370                             ctdb_accept_client, ctdb);
1371         if (fde == NULL) {
1372                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1373         }
1374         tevent_fd_set_auto_close(fde);
1375
1376         /* Start the transport */
1377         if (ctdb->methods->start(ctdb) != 0) {
1378                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1379                 ctdb_fatal(ctdb, "transport failed to start");
1380         }
1381
1382         /* Recovery daemon and timed events are started from the
1383          * callback, only after the setup event completes
1384          * successfully.
1385          */
1386         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1387         ret = ctdb_event_script_callback(ctdb,
1388                                          ctdb,
1389                                          ctdb_setup_event_callback,
1390                                          ctdb,
1391                                          CTDB_EVENT_SETUP,
1392                                          "%s",
1393                                          "");
1394         if (ret != 0) {
1395                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1396                 exit(1);
1397         }
1398
1399         lockdown_memory(ctdb->valgrinding);
1400
1401         /* go into a wait loop to allow other nodes to complete */
1402         tevent_loop_wait(ctdb->ev);
1403
1404         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1405         exit(1);
1406 }
1407
1408 /*
1409   allocate a packet for use in daemon<->daemon communication
1410  */
1411 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1412                                                  TALLOC_CTX *mem_ctx, 
1413                                                  enum ctdb_operation operation, 
1414                                                  size_t length, size_t slength,
1415                                                  const char *type)
1416 {
1417         int size;
1418         struct ctdb_req_header *hdr;
1419
1420         length = MAX(length, slength);
1421         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1422
1423         if (ctdb->methods == NULL) {
1424                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1425                          operation, (unsigned)length));
1426                 return NULL;
1427         }
1428
1429         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1430         if (hdr == NULL) {
1431                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1432                          operation, (unsigned)length));
1433                 return NULL;
1434         }
1435         talloc_set_name_const(hdr, type);
1436         memset(hdr, 0, slength);
1437         hdr->length       = length;
1438         hdr->operation    = operation;
1439         hdr->ctdb_magic   = CTDB_MAGIC;
1440         hdr->ctdb_version = CTDB_PROTOCOL;
1441         hdr->generation   = ctdb->vnn_map->generation;
1442         hdr->srcnode      = ctdb->pnn;
1443
1444         return hdr;     
1445 }
1446
1447 struct daemon_control_state {
1448         struct daemon_control_state *next, *prev;
1449         struct ctdb_client *client;
1450         struct ctdb_req_control_old *c;
1451         uint32_t reqid;
1452         struct ctdb_node *node;
1453 };
1454
1455 /*
1456   callback when a control reply comes in
1457  */
1458 static void daemon_control_callback(struct ctdb_context *ctdb,
1459                                     int32_t status, TDB_DATA data, 
1460                                     const char *errormsg,
1461                                     void *private_data)
1462 {
1463         struct daemon_control_state *state = talloc_get_type(private_data, 
1464                                                              struct daemon_control_state);
1465         struct ctdb_client *client = state->client;
1466         struct ctdb_reply_control_old *r;
1467         size_t len;
1468         int ret;
1469
1470         /* construct a message to send to the client containing the data */
1471         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1472         if (errormsg) {
1473                 len += strlen(errormsg);
1474         }
1475         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1476                                struct ctdb_reply_control_old);
1477         CTDB_NO_MEMORY_VOID(ctdb, r);
1478
1479         r->hdr.reqid     = state->reqid;
1480         r->status        = status;
1481         r->datalen       = data.dsize;
1482         r->errorlen = 0;
1483         memcpy(&r->data[0], data.dptr, data.dsize);
1484         if (errormsg) {
1485                 r->errorlen = strlen(errormsg);
1486                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1487         }
1488
1489         ret = daemon_queue_send(client, &r->hdr);
1490         if (ret != -1) {
1491                 talloc_free(state);
1492         }
1493 }
1494
1495 /*
1496   fail all pending controls to a disconnected node
1497  */
1498 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1499 {
1500         struct daemon_control_state *state;
1501         while ((state = node->pending_controls)) {
1502                 DLIST_REMOVE(node->pending_controls, state);
1503                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1504                                         "node is disconnected", state);
1505         }
1506 }
1507
1508 /*
1509   destroy a daemon_control_state
1510  */
1511 static int daemon_control_destructor(struct daemon_control_state *state)
1512 {
1513         if (state->node) {
1514                 DLIST_REMOVE(state->node->pending_controls, state);
1515         }
1516         return 0;
1517 }
1518
1519 /*
1520   this is called when the ctdb daemon received a ctdb request control
1521   from a local client over the unix domain socket
1522  */
1523 static void daemon_request_control_from_client(struct ctdb_client *client, 
1524                                                struct ctdb_req_control_old *c)
1525 {
1526         TDB_DATA data;
1527         int res;
1528         struct daemon_control_state *state;
1529         TALLOC_CTX *tmp_ctx = talloc_new(client);
1530
1531         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1532                 c->hdr.destnode = client->ctdb->pnn;
1533         }
1534
1535         state = talloc(client, struct daemon_control_state);
1536         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1537
1538         state->client = client;
1539         state->c = talloc_steal(state, c);
1540         state->reqid = c->hdr.reqid;
1541         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1542                 state->node = client->ctdb->nodes[c->hdr.destnode];
1543                 DLIST_ADD(state->node->pending_controls, state);
1544         } else {
1545                 state->node = NULL;
1546         }
1547
1548         talloc_set_destructor(state, daemon_control_destructor);
1549
1550         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1551                 talloc_steal(tmp_ctx, state);
1552         }
1553         
1554         data.dptr = &c->data[0];
1555         data.dsize = c->datalen;
1556         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1557                                        c->srvid, c->opcode, client->client_id,
1558                                        c->flags,
1559                                        data, daemon_control_callback,
1560                                        state);
1561         if (res != 0) {
1562                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1563                          c->hdr.destnode));
1564         }
1565
1566         talloc_free(tmp_ctx);
1567 }
1568
1569 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1570                                               struct ctdb_req_tunnel_old *c)
1571 {
1572         TDB_DATA data;
1573         int ret;
1574
1575         if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1576                 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1577                                   c->hdr.destnode));
1578                 return;
1579         }
1580
1581         ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1582         if (ret != 0) {
1583                 DEBUG(DEBUG_ERR,
1584                       ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1585                        c->tunnel_id));
1586                 return;
1587         }
1588
1589         data = (TDB_DATA) {
1590                 .dsize = c->datalen,
1591                 .dptr = &c->data[0],
1592         };
1593
1594         ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1595                                       c->tunnel_id, c->flags, data);
1596         if (ret != 0) {
1597                 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1598                                   c->hdr.destnode));
1599         }
1600 }
1601
1602 /*
1603   register a call function
1604 */
1605 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1606                          ctdb_fn_t fn, int id)
1607 {
1608         struct ctdb_registered_call *call;
1609         struct ctdb_db_context *ctdb_db;
1610
1611         ctdb_db = find_ctdb_db(ctdb, db_id);
1612         if (ctdb_db == NULL) {
1613                 return -1;
1614         }
1615
1616         call = talloc(ctdb_db, struct ctdb_registered_call);
1617         call->fn = fn;
1618         call->id = id;
1619
1620         DLIST_ADD(ctdb_db->calls, call);        
1621         return 0;
1622 }
1623
1624
1625
1626 /*
1627   this local messaging handler is ugly, but is needed to prevent
1628   recursion in ctdb_send_message() when the destination node is the
1629   same as the source node
1630  */
1631 struct ctdb_local_message {
1632         struct ctdb_context *ctdb;
1633         uint64_t srvid;
1634         TDB_DATA data;
1635 };
1636
1637 static void ctdb_local_message_trigger(struct tevent_context *ev,
1638                                        struct tevent_timer *te,
1639                                        struct timeval t, void *private_data)
1640 {
1641         struct ctdb_local_message *m = talloc_get_type(
1642                 private_data, struct ctdb_local_message);
1643
1644         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1645         talloc_free(m);
1646 }
1647
1648 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1649 {
1650         struct ctdb_local_message *m;
1651         m = talloc(ctdb, struct ctdb_local_message);
1652         CTDB_NO_MEMORY(ctdb, m);
1653
1654         m->ctdb = ctdb;
1655         m->srvid = srvid;
1656         m->data  = data;
1657         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1658         if (m->data.dptr == NULL) {
1659                 talloc_free(m);
1660                 return -1;
1661         }
1662
1663         /* this needs to be done as an event to prevent recursion */
1664         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1665                          ctdb_local_message_trigger, m);
1666         return 0;
1667 }
1668
1669 /*
1670   send a ctdb message
1671 */
1672 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1673                              uint64_t srvid, TDB_DATA data)
1674 {
1675         struct ctdb_req_message_old *r;
1676         int len;
1677
1678         if (ctdb->methods == NULL) {
1679                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1680                 return -1;
1681         }
1682
1683         /* see if this is a message to ourselves */
1684         if (pnn == ctdb->pnn) {
1685                 return ctdb_local_message(ctdb, srvid, data);
1686         }
1687
1688         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1689         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1690                                     struct ctdb_req_message_old);
1691         CTDB_NO_MEMORY(ctdb, r);
1692
1693         r->hdr.destnode  = pnn;
1694         r->srvid         = srvid;
1695         r->datalen       = data.dsize;
1696         memcpy(&r->data[0], data.dptr, data.dsize);
1697
1698         ctdb_queue_packet(ctdb, &r->hdr);
1699
1700         talloc_free(r);
1701         return 0;
1702 }
1703
1704
1705
1706 struct ctdb_client_notify_list {
1707         struct ctdb_client_notify_list *next, *prev;
1708         struct ctdb_context *ctdb;
1709         uint64_t srvid;
1710         TDB_DATA data;
1711 };
1712
1713
1714 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1715 {
1716         int ret;
1717
1718         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1719
1720         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1721         if (ret != 0) {
1722                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1723         }
1724
1725         return 0;
1726 }
1727
1728 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1729 {
1730         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1731         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1732         struct ctdb_client_notify_list *nl;
1733
1734         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1735
1736         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1737                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1738                 return -1;
1739         }
1740
1741         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1742                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1743                 return -1;
1744         }
1745
1746
1747         if (client == NULL) {
1748                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1749                 return -1;
1750         }
1751
1752         for(nl=client->notify; nl; nl=nl->next) {
1753                 if (nl->srvid == notify->srvid) {
1754                         break;
1755                 }
1756         }
1757         if (nl != NULL) {
1758                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1759                 return -1;
1760         }
1761
1762         nl = talloc(client, struct ctdb_client_notify_list);
1763         CTDB_NO_MEMORY(ctdb, nl);
1764         nl->ctdb       = ctdb;
1765         nl->srvid      = notify->srvid;
1766         nl->data.dsize = notify->len;
1767         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1768                                        nl->data.dsize);
1769         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1770         
1771         DLIST_ADD(client->notify, nl);
1772         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1773
1774         return 0;
1775 }
1776
1777 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1778 {
1779         uint64_t srvid = *(uint64_t *)indata.dptr;
1780         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1781         struct ctdb_client_notify_list *nl;
1782
1783         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1784
1785         if (client == NULL) {
1786                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1787                 return -1;
1788         }
1789
1790         for(nl=client->notify; nl; nl=nl->next) {
1791                 if (nl->srvid == srvid) {
1792                         break;
1793                 }
1794         }
1795         if (nl == NULL) {
1796                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1797                 return -1;
1798         }
1799
1800         DLIST_REMOVE(client->notify, nl);
1801         talloc_set_destructor(nl, NULL);
1802         talloc_free(nl);
1803
1804         return 0;
1805 }
1806
1807 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1808 {
1809         struct ctdb_client_pid_list *client_pid;
1810
1811         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1812                 if (client_pid->pid == pid) {
1813                         return client_pid->client;
1814                 }
1815         }
1816         return NULL;
1817 }
1818
1819
1820 /* This control is used by samba when probing if a process (of a samba daemon)
1821    exists on the node.
1822    Samba does this when it needs/wants to check if a subrecord in one of the
1823    databases is still valid, or if it is stale and can be removed.
1824    If the node is in unhealthy or stopped state we just kill of the samba
1825    process holding this sub-record and return to the calling samba that
1826    the process does not exist.
1827    This allows us to forcefully recall subrecords registered by samba processes
1828    on banned and stopped nodes.
1829 */
1830 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1831 {
1832         struct ctdb_client *client;
1833
1834         client = ctdb_find_client_by_pid(ctdb, pid);
1835         if (client == NULL) {
1836                 return -1;
1837         }
1838
1839         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1840                 DEBUG(DEBUG_NOTICE,
1841                       ("Killing client with pid:%d on banned/stopped node\n",
1842                        (int)pid));
1843                 talloc_free(client);
1844                 return -1;
1845         }
1846
1847         return kill(pid, 0);
1848 }
1849
1850 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1851                                      TDB_DATA indata)
1852 {
1853         struct ctdb_client_pid_list *client_pid;
1854         pid_t pid;
1855         uint64_t srvid;
1856         int ret;
1857
1858         pid = *(pid_t *)indata.dptr;
1859         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1860
1861         for (client_pid = ctdb->client_pids;
1862              client_pid != NULL;
1863              client_pid = client_pid->next) {
1864                 if (client_pid->pid == pid) {
1865                         ret = srvid_exists(ctdb->srv, srvid,
1866                                            client_pid->client);
1867                         if (ret == 0) {
1868                                 return 0;
1869                         }
1870                 }
1871         }
1872
1873         return -1;
1874 }
1875
1876 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1877 {
1878         struct ctdb_node_map_old *node_map = NULL;
1879
1880         CHECK_CONTROL_DATA_SIZE(0);
1881
1882         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1883         if (node_map == NULL) {
1884                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1885                 return -1;
1886         }
1887
1888         outdata->dptr  = (unsigned char *)node_map;
1889         outdata->dsize = talloc_get_size(outdata->dptr);
1890
1891         return 0;
1892 }
1893
1894 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1895 {
1896         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1897                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1898                 return;
1899         }
1900
1901         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1902         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1903         ctdb_stop_recoverd(ctdb);
1904         ctdb_stop_keepalive(ctdb);
1905         ctdb_stop_monitoring(ctdb);
1906         ctdb_release_all_ips(ctdb);
1907         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1908         ctdb_stop_eventd(ctdb);
1909         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1910                 ctdb->methods->shutdown(ctdb);
1911         }
1912
1913         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1914         exit(exit_code);
1915 }
1916
1917 /* When forking the main daemon and the child process needs to connect
1918  * back to the daemon as a client process, this function can be used
1919  * to change the ctdb context from daemon into client mode.  The child
1920  * process must be created using ctdb_fork() and not fork() -
1921  * ctdb_fork() does some necessary housekeeping.
1922  */
1923 int switch_from_server_to_client(struct ctdb_context *ctdb)
1924 {
1925         int ret;
1926
1927         /* get a new event context */
1928         ctdb->ev = tevent_context_init(ctdb);
1929         if (ctdb->ev == NULL) {
1930                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1931                 exit(1);
1932         }
1933         tevent_loop_allow_nesting(ctdb->ev);
1934
1935         /* Connect to main CTDB daemon */
1936         ret = ctdb_socket_connect(ctdb);
1937         if (ret != 0) {
1938                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1939                 return -1;
1940         }
1941
1942         ctdb->can_send_controls = true;
1943
1944         return 0;
1945 }