ctdb: Remove double sanity checks from ctdb_daemon_read_cb
[metze/samba/wip.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213                            void *private_data)
214 {
215         struct ctdb_client *client =
216                 talloc_get_type_abort(private_data, struct ctdb_client);
217         struct ctdb_req_tunnel_old *c, *pkt;
218         size_t len;
219
220         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
221
222         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224                                len, struct ctdb_req_tunnel_old);
225         if (c == NULL) {
226                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227                 return;
228         }
229
230         talloc_set_name_const(c, "req_tunnel packet");
231
232         c->tunnel_id = tunnel_id;
233         c->flags = pkt->flags;
234         c->datalen = pkt->datalen;
235         memcpy(c->data, pkt->data, pkt->datalen);
236
237         daemon_queue_send(client, &c->hdr);
238
239         talloc_free(c);
240 }
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         struct ctdb_db_context *ctdb_db;
248
249         ctdb_takeover_client_destructor_hook(client);
250         reqid_remove(client->ctdb->idr, client->client_id);
251         client->ctdb->num_clients--;
252
253         if (client->num_persistent_updates != 0) {
254                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256         }
257         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258         if (ctdb_db) {
259                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260                                   "commit active. Forcing recovery.\n"));
261                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
262
263                 /*
264                  * trans3 transaction state:
265                  *
266                  * The destructor sets the pointer to NULL.
267                  */
268                 talloc_free(ctdb_db->persistent_state);
269         }
270
271         return 0;
272 }
273
274
275 /*
276   this is called when the ctdb daemon received a ctdb request message
277   from a local client over the unix domain socket
278  */
279 static void daemon_request_message_from_client(struct ctdb_client *client, 
280                                                struct ctdb_req_message_old *c)
281 {
282         TDB_DATA data;
283         int res;
284
285         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
287         }
288
289         /* maybe the message is for another client on this node */
290         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292                 return;
293         }
294
295         /* its for a remote node */
296         data.dptr = &c->data[0];
297         data.dsize = c->datalen;
298         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299                                        c->srvid, data);
300         if (res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302                          c->hdr.destnode));
303         }
304 }
305
306
307 struct daemon_call_state {
308         struct ctdb_client *client;
309         uint32_t reqid;
310         struct ctdb_call *call;
311         struct timeval start_time;
312
313         /* readonly request ? */
314         uint32_t readonly_fetch;
315         uint32_t client_callid;
316 };
317
318 /* 
319    complete a call from a client 
320 */
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 {
323         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
324                                                            struct daemon_call_state);
325         struct ctdb_reply_call_old *r;
326         int res;
327         uint32_t length;
328         struct ctdb_client *client = dstate->client;
329         struct ctdb_db_context *ctdb_db = state->ctdb_db;
330
331         talloc_steal(client, dstate);
332         talloc_steal(dstate, dstate->call);
333
334         res = ctdb_daemon_call_recv(state, dstate->call);
335         if (res != 0) {
336                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338
339                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340                 return;
341         }
342
343         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344         /* If the client asked for readonly FETCH, we remapped this to 
345            FETCH_WITH_HEADER when calling the daemon. So we must
346            strip the extra header off the reply data before passing
347            it back to the client.
348         */
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 length -= sizeof(struct ctdb_ltdb_header);
352         }
353
354         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
355                                length, struct ctdb_reply_call_old);
356         if (r == NULL) {
357                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360                 return;
361         }
362         r->hdr.reqid        = dstate->reqid;
363         r->status           = dstate->call->status;
364
365         if (dstate->readonly_fetch
366         && dstate->client_callid == CTDB_FETCH_FUNC) {
367                 /* client only asked for a FETCH so we must strip off
368                    the extra ctdb_ltdb header
369                 */
370                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372         } else {
373                 r->datalen          = dstate->call->reply_data.dsize;
374                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
375         }
376
377         res = daemon_queue_send(client, &r->hdr);
378         if (res == -1) {
379                 /* client is dead - return immediately */
380                 return;
381         }
382         if (res != 0) {
383                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384         }
385         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387         talloc_free(dstate);
388 }
389
390 struct ctdb_daemon_packet_wrap {
391         struct ctdb_context *ctdb;
392         uint32_t client_id;
393 };
394
395 /*
396   a wrapper to catch disconnected clients
397  */
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 {
400         struct ctdb_client *client;
401         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
402                                                             struct ctdb_daemon_packet_wrap);
403         if (w == NULL) {
404                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405                 return;
406         }
407
408         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409         if (client == NULL) {
410                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411                          w->client_id));
412                 talloc_free(w);
413                 return;
414         }
415         talloc_free(w);
416
417         /* process it */
418         daemon_incoming_packet(client, hdr);    
419 }
420
421 struct ctdb_deferred_fetch_call {
422         struct ctdb_deferred_fetch_call *next, *prev;
423         struct ctdb_req_call_old *c;
424         struct ctdb_daemon_packet_wrap *w;
425 };
426
427 struct ctdb_deferred_fetch_queue {
428         struct ctdb_deferred_fetch_call *deferred_calls;
429 };
430
431 struct ctdb_deferred_requeue {
432         struct ctdb_deferred_fetch_call *dfc;
433         struct ctdb_client *client;
434 };
435
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438                                     struct tevent_timer *te,
439                                     struct timeval t, void *private_data)
440 {
441         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442         struct ctdb_client *client = dfr->client;
443
444         talloc_steal(client, dfr->dfc->c);
445         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446         talloc_free(dfr);
447 }
448
449 /* the referral context is destroyed either after a timeout or when the initial
450    fetch-lock has finished.
451    at this stage, immediately start reprocessing the queued up deferred
452    calls so they get reprocessed immediately (and since we are dmaster at
453    this stage, trigger the waiting smbd processes to pick up and aquire the
454    record right away.
455 */
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
457 {
458
459         /* need to reprocess the packets from the queue explicitely instead of
460            just using a normal destructor since we want, need, to
461            call the clients in the same oder as the requests queued up
462         */
463         while (dfq->deferred_calls != NULL) {
464                 struct ctdb_client *client;
465                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466                 struct ctdb_deferred_requeue *dfr;
467
468                 DLIST_REMOVE(dfq->deferred_calls, dfc);
469
470                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471                 if (client == NULL) {
472                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473                                  dfc->w->client_id));
474                         continue;
475                 }
476
477                 /* process it by pushing it back onto the eventloop */
478                 dfr = talloc(client, struct ctdb_deferred_requeue);
479                 if (dfr == NULL) {
480                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481                         continue;
482                 }
483
484                 dfr->dfc    = talloc_steal(dfr, dfc);
485                 dfr->client = client;
486
487                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488                                  reprocess_deferred_call, dfr);
489         }
490
491         return 0;
492 }
493
494 /* insert the new deferral context into the rb tree.
495    there should never be a pre-existing context here, but check for it
496    warn and destroy the previous context if there is already a deferral context
497    for this key.
498 */
499 static void *insert_dfq_callback(void *parm, void *data)
500 {
501         if (data) {
502                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503                 talloc_free(data);
504         }
505         return parm;
506 }
507
508 /* if the original fetch-lock did not complete within a reasonable time,
509    free the context and context for all deferred requests to cause them to be
510    re-inserted into the event system.
511 */
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513                         struct timeval t, void *private_data)
514 {
515         talloc_free(private_data);
516 }
517
518 /* This function is used in the local daemon to register a KEY in a database
519    for being "fetched"
520    While the remote fetch is in-flight, any futher attempts to re-fetch the
521    same record will be deferred until the fetch completes.
522 */
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 {
525         uint32_t *k;
526         struct ctdb_deferred_fetch_queue *dfq;
527
528         k = ctdb_key_to_idkey(call, call->key);
529         if (k == NULL) {
530                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531                 return -1;
532         }
533
534         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
535         if (dfq == NULL) {
536                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537                 talloc_free(k);
538                 return -1;
539         }
540         dfq->deferred_calls = NULL;
541
542         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543
544         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545
546         /* if the fetch havent completed in 30 seconds, just tear it all down
547            and let it try again as the events are reissued */
548         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549                          dfq_timeout, dfq);
550
551         talloc_free(k);
552         return 0;
553 }
554
555 /* check if this is a duplicate request to a fetch already in-flight
556    if it is, make this call deferred to be reprocessed later when
557    the in-flight fetch completes.
558 */
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 {
561         uint32_t *k;
562         struct ctdb_deferred_fetch_queue *dfq;
563         struct ctdb_deferred_fetch_call *dfc;
564
565         k = ctdb_key_to_idkey(c, key);
566         if (k == NULL) {
567                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568                 return -1;
569         }
570
571         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572         if (dfq == NULL) {
573                 talloc_free(k);
574                 return -1;
575         }
576
577
578         talloc_free(k);
579
580         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581         if (dfc == NULL) {
582                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583                 return -1;
584         }
585
586         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587         if (dfc->w == NULL) {
588                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589                 talloc_free(dfc);
590                 return -1;
591         }
592
593         dfc->c = talloc_steal(dfc, c);
594         dfc->w->ctdb = ctdb_db->ctdb;
595         dfc->w->client_id = client->client_id;
596
597         DLIST_ADD_END(dfq->deferred_calls, dfc);
598
599         return 0;
600 }
601
602
603 /*
604   this is called when the ctdb daemon received a ctdb request call
605   from a local client over the unix domain socket
606  */
607 static void daemon_request_call_from_client(struct ctdb_client *client, 
608                                             struct ctdb_req_call_old *c)
609 {
610         struct ctdb_call_state *state;
611         struct ctdb_db_context *ctdb_db;
612         struct daemon_call_state *dstate;
613         struct ctdb_call *call;
614         struct ctdb_ltdb_header header;
615         TDB_DATA key, data;
616         int ret;
617         struct ctdb_context *ctdb = client->ctdb;
618         struct ctdb_daemon_packet_wrap *w;
619
620         CTDB_INCREMENT_STAT(ctdb, total_calls);
621         CTDB_INCREMENT_STAT(ctdb, pending_calls);
622
623         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624         if (!ctdb_db) {
625                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626                           c->db_id));
627                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628                 return;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 /*
633                  * this is just a warning, as the tdb should be empty anyway,
634                  * and only persistent databases can be unhealthy, which doesn't
635                  * use this code patch
636                  */
637                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
639         }
640
641         key.dptr = c->data;
642         key.dsize = c->keylen;
643
644         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645         CTDB_NO_MEMORY_VOID(ctdb, w);   
646
647         w->ctdb = ctdb;
648         w->client_id = client->client_id;
649
650         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
651                                            (struct ctdb_req_header *)c, &data,
652                                            daemon_incoming_packet_wrap, w, true);
653         if (ret == -2) {
654                 /* will retry later */
655                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656                 return;
657         }
658
659         talloc_free(w);
660
661         if (ret != 0) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667
668         /* check if this fetch request is a duplicate for a
669            request we already have in flight. If so defer it until
670            the first request completes.
671         */
672         if (ctdb->tunable.fetch_collapse == 1) {
673                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674                         ret = ctdb_ltdb_unlock(ctdb_db, key);
675                         if (ret != 0) {
676                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677                         }
678                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
679                         talloc_free(data.dptr);
680                         return;
681                 }
682         }
683
684         /* Dont do READONLY if we don't have a tracking database */
685         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686                 c->flags &= ~CTDB_WANT_READONLY;
687         }
688
689         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690                 header.flags &= ~CTDB_REC_RO_FLAGS;
691                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695                 }
696                 /* and clear out the tracking data */
697                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
699                 }
700         }
701
702         /* if we are revoking, we must defer all other calls until the revoke
703          * had completed.
704          */
705         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706                 talloc_free(data.dptr);
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711                 }
712                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713                 return;
714         }
715
716         if ((header.dmaster == ctdb->pnn)
717         && (!(c->flags & CTDB_WANT_READONLY))
718         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
720                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722                 }
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724
725                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to start record revoke");
727                 }
728                 talloc_free(data.dptr);
729
730                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732                 }
733
734                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735                 return;
736         }               
737
738         dstate = talloc(client, struct daemon_call_state);
739         if (dstate == NULL) {
740                 ret = ctdb_ltdb_unlock(ctdb_db, key);
741                 if (ret != 0) {
742                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743                 }
744
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747                 return;
748         }
749         dstate->start_time = timeval_current();
750         dstate->client = client;
751         dstate->reqid  = c->hdr.reqid;
752         talloc_steal(dstate, data.dptr);
753
754         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755         if (call == NULL) {
756                 ret = ctdb_ltdb_unlock(ctdb_db, key);
757                 if (ret != 0) {
758                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759                 }
760
761                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764                 return;
765         }
766
767         dstate->readonly_fetch = 0;
768         call->call_id = c->callid;
769         call->key = key;
770         call->call_data.dptr = c->data + c->keylen;
771         call->call_data.dsize = c->calldatalen;
772         call->flags = c->flags;
773
774         if (c->flags & CTDB_WANT_READONLY) {
775                 /* client wants readonly record, so translate this into a 
776                    fetch with header. remember what the client asked for
777                    so we can remap the reply back to the proper format for
778                    the client in the reply
779                  */
780                 dstate->client_callid = call->call_id;
781                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782                 dstate->readonly_fetch = 1;
783         }
784
785         if (header.dmaster == ctdb->pnn) {
786                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787         } else {
788                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789                 if (ctdb->tunable.fetch_collapse == 1) {
790                         /* This request triggered a remote fetch-lock.
791                            set up a deferral for this key so any additional
792                            fetch-locks are deferred until the current one
793                            finishes.
794                          */
795                         setup_deferred_fetch_locks(ctdb_db, call);
796                 }
797         }
798
799         ret = ctdb_ltdb_unlock(ctdb_db, key);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802         }
803
804         if (state == NULL) {
805                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808                 return;
809         }
810         talloc_steal(state, dstate);
811         talloc_steal(client, state);
812
813         state->async.fn = daemon_call_from_client_callback;
814         state->async.private_data = dstate;
815 }
816
817
818 static void daemon_request_control_from_client(struct ctdb_client *client, 
819                                                struct ctdb_req_control_old *c);
820 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
821                                               struct ctdb_req_tunnel_old *c);
822
823 /* data contains a packet from the client */
824 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
825 {
826         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
827         TALLOC_CTX *tmp_ctx;
828         struct ctdb_context *ctdb = client->ctdb;
829
830         /* place the packet as a child of a tmp_ctx. We then use
831            talloc_free() below to free it. If any of the calls want
832            to keep it, then they will steal it somewhere else, and the
833            talloc_free() will be a no-op */
834         tmp_ctx = talloc_new(client);
835         talloc_steal(tmp_ctx, hdr);
836
837         if (hdr->ctdb_magic != CTDB_MAGIC) {
838                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
839                 goto done;
840         }
841
842         if (hdr->ctdb_version != CTDB_PROTOCOL) {
843                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
844                 goto done;
845         }
846
847         switch (hdr->operation) {
848         case CTDB_REQ_CALL:
849                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
850                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
851                 break;
852
853         case CTDB_REQ_MESSAGE:
854                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
855                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
856                 break;
857
858         case CTDB_REQ_CONTROL:
859                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
860                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
861                 break;
862
863         case CTDB_REQ_TUNNEL:
864                 CTDB_INCREMENT_STAT(ctdb, client.req_tunnel);
865                 daemon_request_tunnel_from_client(client, (struct ctdb_req_tunnel_old *)hdr);
866                 break;
867
868         default:
869                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
870                          hdr->operation));
871         }
872
873 done:
874         talloc_free(tmp_ctx);
875 }
876
877 /*
878   called when the daemon gets a incoming packet
879  */
880 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
881 {
882         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
883         struct ctdb_req_header *hdr;
884
885         if (cnt == 0) {
886                 talloc_free(client);
887                 return;
888         }
889
890         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
891
892         if (cnt < sizeof(*hdr)) {
893                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
894                                (unsigned)cnt);
895                 return;
896         }
897         hdr = (struct ctdb_req_header *)data;
898
899         if (hdr->ctdb_magic != CTDB_MAGIC) {
900                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
901                 goto err_out;
902         }
903
904         if (hdr->ctdb_version != CTDB_PROTOCOL) {
905                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
906                 goto err_out;
907         }
908
909         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
910                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
911                  hdr->srcnode, hdr->destnode));
912
913         /* it is the responsibility of the incoming packet function to free 'data' */
914         daemon_incoming_packet(client, hdr);
915         return;
916
917 err_out:
918         TALLOC_FREE(data);
919 }
920
921
922 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
923 {
924         if (client_pid->ctdb->client_pids != NULL) {
925                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
926         }
927
928         return 0;
929 }
930
931
932 static void ctdb_accept_client(struct tevent_context *ev,
933                                struct tevent_fd *fde, uint16_t flags,
934                                void *private_data)
935 {
936         struct sockaddr_un addr;
937         socklen_t len;
938         int fd;
939         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
940         struct ctdb_client *client;
941         struct ctdb_client_pid_list *client_pid;
942         pid_t peer_pid = 0;
943         int ret;
944
945         memset(&addr, 0, sizeof(addr));
946         len = sizeof(addr);
947         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
948         if (fd == -1) {
949                 return;
950         }
951         smb_set_close_on_exec(fd);
952
953         ret = set_blocking(fd, false);
954         if (ret != 0) {
955                 DEBUG(DEBUG_ERR,
956                       (__location__
957                        " failed to set socket non-blocking (%s)\n",
958                        strerror(errno)));
959                 close(fd);
960                 return;
961         }
962
963         set_close_on_exec(fd);
964
965         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
966
967         client = talloc_zero(ctdb, struct ctdb_client);
968         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
969                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
970         }
971
972         client->ctdb = ctdb;
973         client->fd = fd;
974         client->client_id = reqid_new(ctdb->idr, client);
975         client->pid = peer_pid;
976
977         client_pid = talloc(client, struct ctdb_client_pid_list);
978         if (client_pid == NULL) {
979                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
980                 close(fd);
981                 talloc_free(client);
982                 return;
983         }               
984         client_pid->ctdb   = ctdb;
985         client_pid->pid    = peer_pid;
986         client_pid->client = client;
987
988         DLIST_ADD(ctdb->client_pids, client_pid);
989
990         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
991                                          ctdb_daemon_read_cb, client,
992                                          "client-%u", client->pid);
993
994         talloc_set_destructor(client, ctdb_client_destructor);
995         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
996         ctdb->num_clients++;
997 }
998
999
1000
1001 /*
1002   create a unix domain socket and bind it
1003   return a file descriptor open on the socket 
1004 */
1005 static int ux_socket_bind(struct ctdb_context *ctdb)
1006 {
1007         struct sockaddr_un addr;
1008         int ret;
1009
1010         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1011         if (ctdb->daemon.sd == -1) {
1012                 return -1;
1013         }
1014
1015         memset(&addr, 0, sizeof(addr));
1016         addr.sun_family = AF_UNIX;
1017         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1018
1019         if (! sock_clean(ctdb->daemon.name)) {
1020                 return -1;
1021         }
1022
1023         set_close_on_exec(ctdb->daemon.sd);
1024
1025         ret = set_blocking(ctdb->daemon.sd, false);
1026         if (ret != 0) {
1027                 DEBUG(DEBUG_ERR,
1028                       (__location__
1029                        " failed to set socket non-blocking (%s)\n",
1030                        strerror(errno)));
1031                 goto failed;
1032         }
1033
1034         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1035                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1036                 goto failed;
1037         }
1038
1039         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1040             chmod(ctdb->daemon.name, 0700) != 0) {
1041                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1042                 goto failed;
1043         }
1044
1045
1046         if (listen(ctdb->daemon.sd, 100) != 0) {
1047                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1048                 goto failed;
1049         }
1050
1051         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1052                              ctdb->daemon.name));
1053         return 0;
1054
1055 failed:
1056         close(ctdb->daemon.sd);
1057         ctdb->daemon.sd = -1;
1058         return -1;      
1059 }
1060
1061 static void initialise_node_flags (struct ctdb_context *ctdb)
1062 {
1063         if (ctdb->pnn == -1) {
1064                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1065         }
1066
1067         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1068
1069         /* do we start out in DISABLED mode? */
1070         if (ctdb->start_as_disabled != 0) {
1071                 DEBUG(DEBUG_ERR,
1072                       ("This node is configured to start in DISABLED state\n"));
1073                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1074         }
1075         /* do we start out in STOPPED mode? */
1076         if (ctdb->start_as_stopped != 0) {
1077                 DEBUG(DEBUG_ERR,
1078                       ("This node is configured to start in STOPPED state\n"));
1079                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1080         }
1081 }
1082
1083 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1084                                       void *private_data)
1085 {
1086         if (status != 0) {
1087                 ctdb_die(ctdb, "Failed to run setup event");
1088         }
1089         ctdb_run_notification_script(ctdb, "setup");
1090
1091         /* Start the recovery daemon */
1092         if (ctdb_start_recoverd(ctdb) != 0) {
1093                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1094                 exit(11);
1095         }
1096
1097         ctdb_start_periodic_events(ctdb);
1098
1099         ctdb_wait_for_first_recovery(ctdb);
1100 }
1101
1102 static struct timeval tevent_before_wait_ts;
1103 static struct timeval tevent_after_wait_ts;
1104
1105 static void ctdb_tevent_trace_init(void)
1106 {
1107         struct timeval now;
1108
1109         now = timeval_current();
1110
1111         tevent_before_wait_ts = now;
1112         tevent_after_wait_ts = now;
1113 }
1114
1115 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1116                               void *private_data)
1117 {
1118         struct timeval diff;
1119         struct timeval now;
1120         struct ctdb_context *ctdb =
1121                 talloc_get_type(private_data, struct ctdb_context);
1122
1123         if (getpid() != ctdb->ctdbd_pid) {
1124                 return;
1125         }
1126
1127         now = timeval_current();
1128
1129         switch (tp) {
1130         case TEVENT_TRACE_BEFORE_WAIT:
1131                 diff = timeval_until(&tevent_after_wait_ts, &now);
1132                 if (diff.tv_sec > 3) {
1133                         DEBUG(DEBUG_ERR,
1134                               ("Handling event took %ld seconds!\n",
1135                                (long)diff.tv_sec));
1136                 }
1137                 tevent_before_wait_ts = now;
1138                 break;
1139
1140         case TEVENT_TRACE_AFTER_WAIT:
1141                 diff = timeval_until(&tevent_before_wait_ts, &now);
1142                 if (diff.tv_sec > 3) {
1143                         DEBUG(DEBUG_ERR,
1144                               ("No event for %ld seconds!\n",
1145                                (long)diff.tv_sec));
1146                 }
1147                 tevent_after_wait_ts = now;
1148                 break;
1149
1150         default:
1151                 /* Do nothing for future tevent trace points */ ;
1152         }
1153 }
1154
1155 static void ctdb_remove_pidfile(void)
1156 {
1157         TALLOC_FREE(ctdbd_pidfile_ctx);
1158 }
1159
1160 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1161 {
1162         if (ctdbd_pidfile != NULL) {
1163                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1164                                                  &ctdbd_pidfile_ctx);
1165                 if (ret != 0) {
1166                         DEBUG(DEBUG_ERR,
1167                               ("Failed to create PID file %s\n",
1168                                ctdbd_pidfile));
1169                         exit(11);
1170                 }
1171
1172                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1173                 atexit(ctdb_remove_pidfile);
1174         }
1175 }
1176
1177 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1178 {
1179         int i, j, count;
1180
1181         /* initialize the vnn mapping table, skipping any deleted nodes */
1182         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1183         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1184
1185         count = 0;
1186         for (i = 0; i < ctdb->num_nodes; i++) {
1187                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1188                         count++;
1189                 }
1190         }
1191
1192         ctdb->vnn_map->generation = INVALID_GENERATION;
1193         ctdb->vnn_map->size = count;
1194         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1195         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1196
1197         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1198                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1199                         continue;
1200                 }
1201                 ctdb->vnn_map->map[j] = i;
1202                 j++;
1203         }
1204 }
1205
1206 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1207 {
1208         int nodeid;
1209
1210         if (ctdb->address == NULL) {
1211                 ctdb_fatal(ctdb,
1212                            "Can not determine PNN - node address is not set\n");
1213         }
1214
1215         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1216         if (nodeid == -1) {
1217                 ctdb_fatal(ctdb,
1218                            "Can not determine PNN - node address not found in node list\n");
1219         }
1220
1221         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1222         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1223 }
1224
1225 /*
1226   start the protocol going as a daemon
1227 */
1228 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1229 {
1230         int res, ret = -1;
1231         struct tevent_fd *fde;
1232
1233         become_daemon(do_fork, false, false);
1234
1235         ignore_signal(SIGPIPE);
1236         ignore_signal(SIGUSR1);
1237
1238         ctdb->ctdbd_pid = getpid();
1239         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1240                           ctdb_version_string, ctdb->ctdbd_pid));
1241         ctdb_create_pidfile(ctdb);
1242
1243         /* create a unix domain stream socket to listen to */
1244         res = ux_socket_bind(ctdb);
1245         if (res!=0) {
1246                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1247                 exit(10);
1248         }
1249
1250         /* Make sure we log something when the daemon terminates.
1251          * This must be the first exit handler to run (so the last to
1252          * be registered.
1253          */
1254         __ctdbd_pid = getpid();
1255         atexit(print_exit_message);
1256
1257         if (ctdb->do_setsched) {
1258                 /* try to set us up as realtime */
1259                 if (!set_scheduler()) {
1260                         exit(1);
1261                 }
1262                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1263         }
1264
1265         ctdb->ev = tevent_context_init(NULL);
1266         if (ctdb->ev == NULL) {
1267                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1268                 exit(1);
1269         }
1270         tevent_loop_allow_nesting(ctdb->ev);
1271         ctdb_tevent_trace_init();
1272         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1273
1274         /* set up a handler to pick up sigchld */
1275         if (ctdb_init_sigchld(ctdb) == NULL) {
1276                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1277                 exit(1);
1278         }
1279
1280         if (do_fork) {
1281                 ctdb_set_child_logging(ctdb);
1282         }
1283
1284         TALLOC_FREE(ctdb->srv);
1285         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1286                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1287                 exit(1);
1288         }
1289
1290         TALLOC_FREE(ctdb->tunnels);
1291         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1292                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1293                 exit(1);
1294         }
1295
1296         /* initialize statistics collection */
1297         ctdb_statistics_init(ctdb);
1298
1299         /* force initial recovery for election */
1300         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1301
1302         if (ctdb_start_eventd(ctdb) != 0) {
1303                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1304                 exit(1);
1305         }
1306
1307         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1308         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1309         if (ret != 0) {
1310                 ctdb_die(ctdb, "Failed to run init event\n");
1311         }
1312         ctdb_run_notification_script(ctdb, "init");
1313
1314         if (strcmp(ctdb->transport, "tcp") == 0) {
1315                 ret = ctdb_tcp_init(ctdb);
1316         }
1317 #ifdef USE_INFINIBAND
1318         if (strcmp(ctdb->transport, "ib") == 0) {
1319                 ret = ctdb_ibw_init(ctdb);
1320         }
1321 #endif
1322         if (ret != 0) {
1323                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1324                 return -1;
1325         }
1326
1327         if (ctdb->methods == NULL) {
1328                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1329                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1330         }
1331
1332         /* Initialise the transport.  This sets the node address if it
1333          * was not set via the command-line. */
1334         if (ctdb->methods->initialise(ctdb) != 0) {
1335                 ctdb_fatal(ctdb, "transport failed to initialise");
1336         }
1337
1338         ctdb_set_my_pnn(ctdb);
1339
1340         initialise_node_flags(ctdb);
1341
1342         ret = ctdb_set_public_addresses(ctdb, true);
1343         if (ret == -1) {
1344                 D_ERR("Unable to setup public IP addresses\n");
1345                 exit(1);
1346         }
1347
1348         ctdb_initialise_vnn_map(ctdb);
1349
1350         /* attach to existing databases */
1351         if (ctdb_attach_databases(ctdb) != 0) {
1352                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1353         }
1354
1355         /* start frozen, then let the first election sort things out */
1356         if (!ctdb_blocking_freeze(ctdb)) {
1357                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1358         }
1359
1360         /* now start accepting clients, only can do this once frozen */
1361         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1362                             ctdb_accept_client, ctdb);
1363         if (fde == NULL) {
1364                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1365         }
1366         tevent_fd_set_auto_close(fde);
1367
1368         /* Start the transport */
1369         if (ctdb->methods->start(ctdb) != 0) {
1370                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1371                 ctdb_fatal(ctdb, "transport failed to start");
1372         }
1373
1374         /* Recovery daemon and timed events are started from the
1375          * callback, only after the setup event completes
1376          * successfully.
1377          */
1378         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1379         ret = ctdb_event_script_callback(ctdb,
1380                                          ctdb,
1381                                          ctdb_setup_event_callback,
1382                                          ctdb,
1383                                          CTDB_EVENT_SETUP,
1384                                          "%s",
1385                                          "");
1386         if (ret != 0) {
1387                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1388                 exit(1);
1389         }
1390
1391         lockdown_memory(ctdb->valgrinding);
1392
1393         /* go into a wait loop to allow other nodes to complete */
1394         tevent_loop_wait(ctdb->ev);
1395
1396         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1397         exit(1);
1398 }
1399
1400 /*
1401   allocate a packet for use in daemon<->daemon communication
1402  */
1403 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1404                                                  TALLOC_CTX *mem_ctx, 
1405                                                  enum ctdb_operation operation, 
1406                                                  size_t length, size_t slength,
1407                                                  const char *type)
1408 {
1409         int size;
1410         struct ctdb_req_header *hdr;
1411
1412         length = MAX(length, slength);
1413         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1414
1415         if (ctdb->methods == NULL) {
1416                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1417                          operation, (unsigned)length));
1418                 return NULL;
1419         }
1420
1421         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1422         if (hdr == NULL) {
1423                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1424                          operation, (unsigned)length));
1425                 return NULL;
1426         }
1427         talloc_set_name_const(hdr, type);
1428         memset(hdr, 0, slength);
1429         hdr->length       = length;
1430         hdr->operation    = operation;
1431         hdr->ctdb_magic   = CTDB_MAGIC;
1432         hdr->ctdb_version = CTDB_PROTOCOL;
1433         hdr->generation   = ctdb->vnn_map->generation;
1434         hdr->srcnode      = ctdb->pnn;
1435
1436         return hdr;     
1437 }
1438
1439 struct daemon_control_state {
1440         struct daemon_control_state *next, *prev;
1441         struct ctdb_client *client;
1442         struct ctdb_req_control_old *c;
1443         uint32_t reqid;
1444         struct ctdb_node *node;
1445 };
1446
1447 /*
1448   callback when a control reply comes in
1449  */
1450 static void daemon_control_callback(struct ctdb_context *ctdb,
1451                                     int32_t status, TDB_DATA data, 
1452                                     const char *errormsg,
1453                                     void *private_data)
1454 {
1455         struct daemon_control_state *state = talloc_get_type(private_data, 
1456                                                              struct daemon_control_state);
1457         struct ctdb_client *client = state->client;
1458         struct ctdb_reply_control_old *r;
1459         size_t len;
1460         int ret;
1461
1462         /* construct a message to send to the client containing the data */
1463         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1464         if (errormsg) {
1465                 len += strlen(errormsg);
1466         }
1467         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1468                                struct ctdb_reply_control_old);
1469         CTDB_NO_MEMORY_VOID(ctdb, r);
1470
1471         r->hdr.reqid     = state->reqid;
1472         r->status        = status;
1473         r->datalen       = data.dsize;
1474         r->errorlen = 0;
1475         memcpy(&r->data[0], data.dptr, data.dsize);
1476         if (errormsg) {
1477                 r->errorlen = strlen(errormsg);
1478                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1479         }
1480
1481         ret = daemon_queue_send(client, &r->hdr);
1482         if (ret != -1) {
1483                 talloc_free(state);
1484         }
1485 }
1486
1487 /*
1488   fail all pending controls to a disconnected node
1489  */
1490 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1491 {
1492         struct daemon_control_state *state;
1493         while ((state = node->pending_controls)) {
1494                 DLIST_REMOVE(node->pending_controls, state);
1495                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1496                                         "node is disconnected", state);
1497         }
1498 }
1499
1500 /*
1501   destroy a daemon_control_state
1502  */
1503 static int daemon_control_destructor(struct daemon_control_state *state)
1504 {
1505         if (state->node) {
1506                 DLIST_REMOVE(state->node->pending_controls, state);
1507         }
1508         return 0;
1509 }
1510
1511 /*
1512   this is called when the ctdb daemon received a ctdb request control
1513   from a local client over the unix domain socket
1514  */
1515 static void daemon_request_control_from_client(struct ctdb_client *client, 
1516                                                struct ctdb_req_control_old *c)
1517 {
1518         TDB_DATA data;
1519         int res;
1520         struct daemon_control_state *state;
1521         TALLOC_CTX *tmp_ctx = talloc_new(client);
1522
1523         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1524                 c->hdr.destnode = client->ctdb->pnn;
1525         }
1526
1527         state = talloc(client, struct daemon_control_state);
1528         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1529
1530         state->client = client;
1531         state->c = talloc_steal(state, c);
1532         state->reqid = c->hdr.reqid;
1533         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1534                 state->node = client->ctdb->nodes[c->hdr.destnode];
1535                 DLIST_ADD(state->node->pending_controls, state);
1536         } else {
1537                 state->node = NULL;
1538         }
1539
1540         talloc_set_destructor(state, daemon_control_destructor);
1541
1542         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1543                 talloc_steal(tmp_ctx, state);
1544         }
1545         
1546         data.dptr = &c->data[0];
1547         data.dsize = c->datalen;
1548         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1549                                        c->srvid, c->opcode, client->client_id,
1550                                        c->flags,
1551                                        data, daemon_control_callback,
1552                                        state);
1553         if (res != 0) {
1554                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1555                          c->hdr.destnode));
1556         }
1557
1558         talloc_free(tmp_ctx);
1559 }
1560
1561 static void daemon_request_tunnel_from_client(struct ctdb_client *client,
1562                                               struct ctdb_req_tunnel_old *c)
1563 {
1564         TDB_DATA data;
1565         int ret;
1566
1567         if (! ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1568                 DEBUG(DEBUG_ERR, ("Invalid destination 0x%x\n",
1569                                   c->hdr.destnode));
1570                 return;
1571         }
1572
1573         ret = srvid_exists(client->ctdb->tunnels, c->tunnel_id, NULL);
1574         if (ret != 0) {
1575                 DEBUG(DEBUG_ERR,
1576                       ("tunnel id 0x%"PRIx64" not registered, dropping pkt\n",
1577                        c->tunnel_id));
1578                 return;
1579         }
1580
1581         data = (TDB_DATA) {
1582                 .dsize = c->datalen,
1583                 .dptr = &c->data[0],
1584         };
1585
1586         ret = ctdb_daemon_send_tunnel(client->ctdb, c->hdr.destnode,
1587                                       c->tunnel_id, c->flags, data);
1588         if (ret != 0) {
1589                 DEBUG(DEBUG_ERR, ("Failed to set tunnel to remote note %u\n",
1590                                   c->hdr.destnode));
1591         }
1592 }
1593
1594 /*
1595   register a call function
1596 */
1597 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1598                          ctdb_fn_t fn, int id)
1599 {
1600         struct ctdb_registered_call *call;
1601         struct ctdb_db_context *ctdb_db;
1602
1603         ctdb_db = find_ctdb_db(ctdb, db_id);
1604         if (ctdb_db == NULL) {
1605                 return -1;
1606         }
1607
1608         call = talloc(ctdb_db, struct ctdb_registered_call);
1609         call->fn = fn;
1610         call->id = id;
1611
1612         DLIST_ADD(ctdb_db->calls, call);        
1613         return 0;
1614 }
1615
1616
1617
1618 /*
1619   this local messaging handler is ugly, but is needed to prevent
1620   recursion in ctdb_send_message() when the destination node is the
1621   same as the source node
1622  */
1623 struct ctdb_local_message {
1624         struct ctdb_context *ctdb;
1625         uint64_t srvid;
1626         TDB_DATA data;
1627 };
1628
1629 static void ctdb_local_message_trigger(struct tevent_context *ev,
1630                                        struct tevent_timer *te,
1631                                        struct timeval t, void *private_data)
1632 {
1633         struct ctdb_local_message *m = talloc_get_type(
1634                 private_data, struct ctdb_local_message);
1635
1636         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1637         talloc_free(m);
1638 }
1639
1640 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1641 {
1642         struct ctdb_local_message *m;
1643         m = talloc(ctdb, struct ctdb_local_message);
1644         CTDB_NO_MEMORY(ctdb, m);
1645
1646         m->ctdb = ctdb;
1647         m->srvid = srvid;
1648         m->data  = data;
1649         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1650         if (m->data.dptr == NULL) {
1651                 talloc_free(m);
1652                 return -1;
1653         }
1654
1655         /* this needs to be done as an event to prevent recursion */
1656         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1657                          ctdb_local_message_trigger, m);
1658         return 0;
1659 }
1660
1661 /*
1662   send a ctdb message
1663 */
1664 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1665                              uint64_t srvid, TDB_DATA data)
1666 {
1667         struct ctdb_req_message_old *r;
1668         int len;
1669
1670         if (ctdb->methods == NULL) {
1671                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1672                 return -1;
1673         }
1674
1675         /* see if this is a message to ourselves */
1676         if (pnn == ctdb->pnn) {
1677                 return ctdb_local_message(ctdb, srvid, data);
1678         }
1679
1680         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1681         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1682                                     struct ctdb_req_message_old);
1683         CTDB_NO_MEMORY(ctdb, r);
1684
1685         r->hdr.destnode  = pnn;
1686         r->srvid         = srvid;
1687         r->datalen       = data.dsize;
1688         memcpy(&r->data[0], data.dptr, data.dsize);
1689
1690         ctdb_queue_packet(ctdb, &r->hdr);
1691
1692         talloc_free(r);
1693         return 0;
1694 }
1695
1696
1697
1698 struct ctdb_client_notify_list {
1699         struct ctdb_client_notify_list *next, *prev;
1700         struct ctdb_context *ctdb;
1701         uint64_t srvid;
1702         TDB_DATA data;
1703 };
1704
1705
1706 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1707 {
1708         int ret;
1709
1710         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1711
1712         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1713         if (ret != 0) {
1714                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1715         }
1716
1717         return 0;
1718 }
1719
1720 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1721 {
1722         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1723         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1724         struct ctdb_client_notify_list *nl;
1725
1726         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1727
1728         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1729                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1730                 return -1;
1731         }
1732
1733         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1734                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1735                 return -1;
1736         }
1737
1738
1739         if (client == NULL) {
1740                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1741                 return -1;
1742         }
1743
1744         for(nl=client->notify; nl; nl=nl->next) {
1745                 if (nl->srvid == notify->srvid) {
1746                         break;
1747                 }
1748         }
1749         if (nl != NULL) {
1750                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1751                 return -1;
1752         }
1753
1754         nl = talloc(client, struct ctdb_client_notify_list);
1755         CTDB_NO_MEMORY(ctdb, nl);
1756         nl->ctdb       = ctdb;
1757         nl->srvid      = notify->srvid;
1758         nl->data.dsize = notify->len;
1759         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1760                                        nl->data.dsize);
1761         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1762         
1763         DLIST_ADD(client->notify, nl);
1764         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1765
1766         return 0;
1767 }
1768
1769 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1770 {
1771         uint64_t srvid = *(uint64_t *)indata.dptr;
1772         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1773         struct ctdb_client_notify_list *nl;
1774
1775         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1776
1777         if (client == NULL) {
1778                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1779                 return -1;
1780         }
1781
1782         for(nl=client->notify; nl; nl=nl->next) {
1783                 if (nl->srvid == srvid) {
1784                         break;
1785                 }
1786         }
1787         if (nl == NULL) {
1788                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1789                 return -1;
1790         }
1791
1792         DLIST_REMOVE(client->notify, nl);
1793         talloc_set_destructor(nl, NULL);
1794         talloc_free(nl);
1795
1796         return 0;
1797 }
1798
1799 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1800 {
1801         struct ctdb_client_pid_list *client_pid;
1802
1803         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1804                 if (client_pid->pid == pid) {
1805                         return client_pid->client;
1806                 }
1807         }
1808         return NULL;
1809 }
1810
1811
1812 /* This control is used by samba when probing if a process (of a samba daemon)
1813    exists on the node.
1814    Samba does this when it needs/wants to check if a subrecord in one of the
1815    databases is still valid, or if it is stale and can be removed.
1816    If the node is in unhealthy or stopped state we just kill of the samba
1817    process holding this sub-record and return to the calling samba that
1818    the process does not exist.
1819    This allows us to forcefully recall subrecords registered by samba processes
1820    on banned and stopped nodes.
1821 */
1822 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1823 {
1824         struct ctdb_client *client;
1825
1826         client = ctdb_find_client_by_pid(ctdb, pid);
1827         if (client == NULL) {
1828                 return -1;
1829         }
1830
1831         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1832                 DEBUG(DEBUG_NOTICE,
1833                       ("Killing client with pid:%d on banned/stopped node\n",
1834                        (int)pid));
1835                 talloc_free(client);
1836                 return -1;
1837         }
1838
1839         return kill(pid, 0);
1840 }
1841
1842 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1843                                      TDB_DATA indata)
1844 {
1845         struct ctdb_client_pid_list *client_pid;
1846         pid_t pid;
1847         uint64_t srvid;
1848         int ret;
1849
1850         pid = *(pid_t *)indata.dptr;
1851         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1852
1853         for (client_pid = ctdb->client_pids;
1854              client_pid != NULL;
1855              client_pid = client_pid->next) {
1856                 if (client_pid->pid == pid) {
1857                         ret = srvid_exists(ctdb->srv, srvid,
1858                                            client_pid->client);
1859                         if (ret == 0) {
1860                                 return 0;
1861                         }
1862                 }
1863         }
1864
1865         return -1;
1866 }
1867
1868 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1869 {
1870         struct ctdb_node_map_old *node_map = NULL;
1871
1872         CHECK_CONTROL_DATA_SIZE(0);
1873
1874         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1875         if (node_map == NULL) {
1876                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1877                 return -1;
1878         }
1879
1880         outdata->dptr  = (unsigned char *)node_map;
1881         outdata->dsize = talloc_get_size(outdata->dptr);
1882
1883         return 0;
1884 }
1885
1886 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1887 {
1888         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1889                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1890                 return;
1891         }
1892
1893         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1894         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1895         ctdb_stop_recoverd(ctdb);
1896         ctdb_stop_keepalive(ctdb);
1897         ctdb_stop_monitoring(ctdb);
1898         ctdb_release_all_ips(ctdb);
1899         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1900         ctdb_stop_eventd(ctdb);
1901         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1902                 ctdb->methods->shutdown(ctdb);
1903         }
1904
1905         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1906         exit(exit_code);
1907 }
1908
1909 /* When forking the main daemon and the child process needs to connect
1910  * back to the daemon as a client process, this function can be used
1911  * to change the ctdb context from daemon into client mode.  The child
1912  * process must be created using ctdb_fork() and not fork() -
1913  * ctdb_fork() does some necessary housekeeping.
1914  */
1915 int switch_from_server_to_client(struct ctdb_context *ctdb)
1916 {
1917         int ret;
1918
1919         /* get a new event context */
1920         ctdb->ev = tevent_context_init(ctdb);
1921         if (ctdb->ev == NULL) {
1922                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1923                 exit(1);
1924         }
1925         tevent_loop_allow_nesting(ctdb->ev);
1926
1927         /* Connect to main CTDB daemon */
1928         ret = ctdb_socket_connect(ctdb);
1929         if (ret != 0) {
1930                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1931                 return -1;
1932         }
1933
1934         ctdb->can_send_controls = true;
1935
1936         return 0;
1937 }