dc2fd5d9695c3e50c4b70029514b8eef4f172fa7
[metze/samba/wip.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/time.h"
36 #include "lib/util/blocking.h"
37 #include "lib/util/become_daemon.h"
38
39 #include "common/version.h"
40 #include "ctdb_private.h"
41 #include "ctdb_client.h"
42
43 #include "common/rb_tree.h"
44 #include "common/reqid.h"
45 #include "common/system.h"
46 #include "common/common.h"
47 #include "common/logging.h"
48 #include "common/pidfile.h"
49 #include "common/sock_io.h"
50
51 struct ctdb_client_pid_list {
52         struct ctdb_client_pid_list *next, *prev;
53         struct ctdb_context *ctdb;
54         pid_t pid;
55         struct ctdb_client *client;
56 };
57
58 const char *ctdbd_pidfile = NULL;
59 static struct pidfile_context *ctdbd_pidfile_ctx = NULL;
60
61 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
62
63 static pid_t __ctdbd_pid;
64
65 static void print_exit_message(void)
66 {
67         if (getpid() == __ctdbd_pid) {
68                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
69
70                 /* Wait a second to allow pending log messages to be flushed */
71                 sleep(1);
72         }
73 }
74
75
76
77 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
78                                   struct timeval t, void *private_data)
79 {
80         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
81
82         if (getpid() != ctdb->ctdbd_pid) {
83                 return;
84         }
85
86         tevent_add_timer(ctdb->ev, ctdb,
87                          timeval_current_ofs(1, 0),
88                          ctdb_time_tick, ctdb);
89 }
90
91 /* Used to trigger a dummy event once per second, to make
92  * detection of hangs more reliable.
93  */
94 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
95 {
96         tevent_add_timer(ctdb->ev, ctdb,
97                          timeval_current_ofs(1, 0),
98                          ctdb_time_tick, ctdb);
99 }
100
101 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
102 {
103         /* start monitoring for connected/disconnected nodes */
104         ctdb_start_keepalive(ctdb);
105
106         /* start periodic update of tcp tickle lists */
107         ctdb_start_tcp_tickle_update(ctdb);
108
109         /* start listening for recovery daemon pings */
110         ctdb_control_recd_ping(ctdb);
111
112         /* start listening to timer ticks */
113         ctdb_start_time_tickd(ctdb);
114 }
115
116 static void ignore_signal(int signum)
117 {
118         struct sigaction act;
119
120         memset(&act, 0, sizeof(act));
121
122         act.sa_handler = SIG_IGN;
123         sigemptyset(&act.sa_mask);
124         sigaddset(&act.sa_mask, signum);
125         sigaction(signum, &act, NULL);
126 }
127
128
129 /*
130   send a packet to a client
131  */
132 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
133 {
134         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
135         if (hdr->operation == CTDB_REQ_MESSAGE) {
136                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
137                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
138                         talloc_free(client);
139                         return -1;
140                 }
141         }
142         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
143 }
144
145 /*
146   message handler for when we are in daemon mode. This redirects the message
147   to the right client
148  */
149 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
150                                    void *private_data)
151 {
152         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
153         struct ctdb_req_message_old *r;
154         int len;
155
156         /* construct a message to send to the client containing the data */
157         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
158         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
159                                len, struct ctdb_req_message_old);
160         CTDB_NO_MEMORY_VOID(client->ctdb, r);
161
162         talloc_set_name_const(r, "req_message packet");
163
164         r->srvid         = srvid;
165         r->datalen       = data.dsize;
166         memcpy(&r->data[0], data.dptr, data.dsize);
167
168         daemon_queue_send(client, &r->hdr);
169
170         talloc_free(r);
171 }
172
173 /*
174   this is called when the ctdb daemon received a ctdb request to 
175   set the srvid from the client
176  */
177 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
178 {
179         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
180         int res;
181         if (client == NULL) {
182                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
183                 return -1;
184         }
185         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
186                              client);
187         if (res != 0) {
188                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
189                          (unsigned long long)srvid));
190         } else {
191                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
192                          (unsigned long long)srvid));
193         }
194
195         return res;
196 }
197
198 /*
199   this is called when the ctdb daemon received a ctdb request to 
200   remove a srvid from the client
201  */
202 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
203 {
204         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
205         if (client == NULL) {
206                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
207                 return -1;
208         }
209         return srvid_deregister(ctdb->srv, srvid, client);
210 }
211
212 void daemon_tunnel_handler(uint64_t tunnel_id, TDB_DATA data,
213                            void *private_data)
214 {
215         struct ctdb_client *client =
216                 talloc_get_type_abort(private_data, struct ctdb_client);
217         struct ctdb_req_tunnel_old *c, *pkt;
218         size_t len;
219
220         pkt = (struct ctdb_req_tunnel_old *)data.dptr;
221
222         len = offsetof(struct ctdb_req_tunnel_old, data) + pkt->datalen;
223         c = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_TUNNEL,
224                                len, struct ctdb_req_tunnel_old);
225         if (c == NULL) {
226                 DEBUG(DEBUG_ERR, ("Memory error in daemon_tunnel_handler\n"));
227                 return;
228         }
229
230         talloc_set_name_const(c, "req_tunnel packet");
231
232         c->tunnel_id = tunnel_id;
233         c->flags = pkt->flags;
234         c->datalen = pkt->datalen;
235         memcpy(c->data, pkt->data, pkt->datalen);
236
237         daemon_queue_send(client, &c->hdr);
238
239         talloc_free(c);
240 }
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         struct ctdb_db_context *ctdb_db;
248
249         ctdb_takeover_client_destructor_hook(client);
250         reqid_remove(client->ctdb->idr, client->client_id);
251         client->ctdb->num_clients--;
252
253         if (client->num_persistent_updates != 0) {
254                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
255                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
256         }
257         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
258         if (ctdb_db) {
259                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
260                                   "commit active. Forcing recovery.\n"));
261                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
262
263                 /*
264                  * trans3 transaction state:
265                  *
266                  * The destructor sets the pointer to NULL.
267                  */
268                 talloc_free(ctdb_db->persistent_state);
269         }
270
271         return 0;
272 }
273
274
275 /*
276   this is called when the ctdb daemon received a ctdb request message
277   from a local client over the unix domain socket
278  */
279 static void daemon_request_message_from_client(struct ctdb_client *client, 
280                                                struct ctdb_req_message_old *c)
281 {
282         TDB_DATA data;
283         int res;
284
285         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
286                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
287         }
288
289         /* maybe the message is for another client on this node */
290         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
291                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
292                 return;
293         }
294
295         /* its for a remote node */
296         data.dptr = &c->data[0];
297         data.dsize = c->datalen;
298         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
299                                        c->srvid, data);
300         if (res != 0) {
301                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
302                          c->hdr.destnode));
303         }
304 }
305
306
307 struct daemon_call_state {
308         struct ctdb_client *client;
309         uint32_t reqid;
310         struct ctdb_call *call;
311         struct timeval start_time;
312
313         /* readonly request ? */
314         uint32_t readonly_fetch;
315         uint32_t client_callid;
316 };
317
318 /* 
319    complete a call from a client 
320 */
321 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
322 {
323         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
324                                                            struct daemon_call_state);
325         struct ctdb_reply_call_old *r;
326         int res;
327         uint32_t length;
328         struct ctdb_client *client = dstate->client;
329         struct ctdb_db_context *ctdb_db = state->ctdb_db;
330
331         talloc_steal(client, dstate);
332         talloc_steal(dstate, dstate->call);
333
334         res = ctdb_daemon_call_recv(state, dstate->call);
335         if (res != 0) {
336                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
337                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
338
339                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
340                 return;
341         }
342
343         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
344         /* If the client asked for readonly FETCH, we remapped this to 
345            FETCH_WITH_HEADER when calling the daemon. So we must
346            strip the extra header off the reply data before passing
347            it back to the client.
348         */
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 length -= sizeof(struct ctdb_ltdb_header);
352         }
353
354         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
355                                length, struct ctdb_reply_call_old);
356         if (r == NULL) {
357                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
358                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
359                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
360                 return;
361         }
362         r->hdr.reqid        = dstate->reqid;
363         r->status           = dstate->call->status;
364
365         if (dstate->readonly_fetch
366         && dstate->client_callid == CTDB_FETCH_FUNC) {
367                 /* client only asked for a FETCH so we must strip off
368                    the extra ctdb_ltdb header
369                 */
370                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
371                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
372         } else {
373                 r->datalen          = dstate->call->reply_data.dsize;
374                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
375         }
376
377         res = daemon_queue_send(client, &r->hdr);
378         if (res == -1) {
379                 /* client is dead - return immediately */
380                 return;
381         }
382         if (res != 0) {
383                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
384         }
385         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
386         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
387         talloc_free(dstate);
388 }
389
390 struct ctdb_daemon_packet_wrap {
391         struct ctdb_context *ctdb;
392         uint32_t client_id;
393 };
394
395 /*
396   a wrapper to catch disconnected clients
397  */
398 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
399 {
400         struct ctdb_client *client;
401         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
402                                                             struct ctdb_daemon_packet_wrap);
403         if (w == NULL) {
404                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
405                 return;
406         }
407
408         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
409         if (client == NULL) {
410                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
411                          w->client_id));
412                 talloc_free(w);
413                 return;
414         }
415         talloc_free(w);
416
417         /* process it */
418         daemon_incoming_packet(client, hdr);    
419 }
420
421 struct ctdb_deferred_fetch_call {
422         struct ctdb_deferred_fetch_call *next, *prev;
423         struct ctdb_req_call_old *c;
424         struct ctdb_daemon_packet_wrap *w;
425 };
426
427 struct ctdb_deferred_fetch_queue {
428         struct ctdb_deferred_fetch_call *deferred_calls;
429 };
430
431 struct ctdb_deferred_requeue {
432         struct ctdb_deferred_fetch_call *dfc;
433         struct ctdb_client *client;
434 };
435
436 /* called from a timer event and starts reprocessing the deferred call.*/
437 static void reprocess_deferred_call(struct tevent_context *ev,
438                                     struct tevent_timer *te,
439                                     struct timeval t, void *private_data)
440 {
441         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
442         struct ctdb_client *client = dfr->client;
443
444         talloc_steal(client, dfr->dfc->c);
445         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
446         talloc_free(dfr);
447 }
448
449 /* the referral context is destroyed either after a timeout or when the initial
450    fetch-lock has finished.
451    at this stage, immediately start reprocessing the queued up deferred
452    calls so they get reprocessed immediately (and since we are dmaster at
453    this stage, trigger the waiting smbd processes to pick up and aquire the
454    record right away.
455 */
456 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
457 {
458
459         /* need to reprocess the packets from the queue explicitely instead of
460            just using a normal destructor since we want, need, to
461            call the clients in the same oder as the requests queued up
462         */
463         while (dfq->deferred_calls != NULL) {
464                 struct ctdb_client *client;
465                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
466                 struct ctdb_deferred_requeue *dfr;
467
468                 DLIST_REMOVE(dfq->deferred_calls, dfc);
469
470                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
471                 if (client == NULL) {
472                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
473                                  dfc->w->client_id));
474                         continue;
475                 }
476
477                 /* process it by pushing it back onto the eventloop */
478                 dfr = talloc(client, struct ctdb_deferred_requeue);
479                 if (dfr == NULL) {
480                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
481                         continue;
482                 }
483
484                 dfr->dfc    = talloc_steal(dfr, dfc);
485                 dfr->client = client;
486
487                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
488                                  reprocess_deferred_call, dfr);
489         }
490
491         return 0;
492 }
493
494 /* insert the new deferral context into the rb tree.
495    there should never be a pre-existing context here, but check for it
496    warn and destroy the previous context if there is already a deferral context
497    for this key.
498 */
499 static void *insert_dfq_callback(void *parm, void *data)
500 {
501         if (data) {
502                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
503                 talloc_free(data);
504         }
505         return parm;
506 }
507
508 /* if the original fetch-lock did not complete within a reasonable time,
509    free the context and context for all deferred requests to cause them to be
510    re-inserted into the event system.
511 */
512 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
513                         struct timeval t, void *private_data)
514 {
515         talloc_free(private_data);
516 }
517
518 /* This function is used in the local daemon to register a KEY in a database
519    for being "fetched"
520    While the remote fetch is in-flight, any futher attempts to re-fetch the
521    same record will be deferred until the fetch completes.
522 */
523 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
524 {
525         uint32_t *k;
526         struct ctdb_deferred_fetch_queue *dfq;
527
528         k = ctdb_key_to_idkey(call, call->key);
529         if (k == NULL) {
530                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
531                 return -1;
532         }
533
534         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
535         if (dfq == NULL) {
536                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
537                 talloc_free(k);
538                 return -1;
539         }
540         dfq->deferred_calls = NULL;
541
542         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
543
544         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
545
546         /* if the fetch havent completed in 30 seconds, just tear it all down
547            and let it try again as the events are reissued */
548         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
549                          dfq_timeout, dfq);
550
551         talloc_free(k);
552         return 0;
553 }
554
555 /* check if this is a duplicate request to a fetch already in-flight
556    if it is, make this call deferred to be reprocessed later when
557    the in-flight fetch completes.
558 */
559 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
560 {
561         uint32_t *k;
562         struct ctdb_deferred_fetch_queue *dfq;
563         struct ctdb_deferred_fetch_call *dfc;
564
565         k = ctdb_key_to_idkey(c, key);
566         if (k == NULL) {
567                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
568                 return -1;
569         }
570
571         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
572         if (dfq == NULL) {
573                 talloc_free(k);
574                 return -1;
575         }
576
577
578         talloc_free(k);
579
580         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
581         if (dfc == NULL) {
582                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
583                 return -1;
584         }
585
586         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
587         if (dfc->w == NULL) {
588                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
589                 talloc_free(dfc);
590                 return -1;
591         }
592
593         dfc->c = talloc_steal(dfc, c);
594         dfc->w->ctdb = ctdb_db->ctdb;
595         dfc->w->client_id = client->client_id;
596
597         DLIST_ADD_END(dfq->deferred_calls, dfc);
598
599         return 0;
600 }
601
602
603 /*
604   this is called when the ctdb daemon received a ctdb request call
605   from a local client over the unix domain socket
606  */
607 static void daemon_request_call_from_client(struct ctdb_client *client, 
608                                             struct ctdb_req_call_old *c)
609 {
610         struct ctdb_call_state *state;
611         struct ctdb_db_context *ctdb_db;
612         struct daemon_call_state *dstate;
613         struct ctdb_call *call;
614         struct ctdb_ltdb_header header;
615         TDB_DATA key, data;
616         int ret;
617         struct ctdb_context *ctdb = client->ctdb;
618         struct ctdb_daemon_packet_wrap *w;
619
620         CTDB_INCREMENT_STAT(ctdb, total_calls);
621         CTDB_INCREMENT_STAT(ctdb, pending_calls);
622
623         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
624         if (!ctdb_db) {
625                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
626                           c->db_id));
627                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
628                 return;
629         }
630
631         if (ctdb_db->unhealthy_reason) {
632                 /*
633                  * this is just a warning, as the tdb should be empty anyway,
634                  * and only persistent databases can be unhealthy, which doesn't
635                  * use this code patch
636                  */
637                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
638                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
639         }
640
641         key.dptr = c->data;
642         key.dsize = c->keylen;
643
644         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
645         CTDB_NO_MEMORY_VOID(ctdb, w);   
646
647         w->ctdb = ctdb;
648         w->client_id = client->client_id;
649
650         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
651                                            (struct ctdb_req_header *)c, &data,
652                                            daemon_incoming_packet_wrap, w, true);
653         if (ret == -2) {
654                 /* will retry later */
655                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
656                 return;
657         }
658
659         talloc_free(w);
660
661         if (ret != 0) {
662                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
663                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
664                 return;
665         }
666
667
668         /* check if this fetch request is a duplicate for a
669            request we already have in flight. If so defer it until
670            the first request completes.
671         */
672         if (ctdb->tunable.fetch_collapse == 1) {
673                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
674                         ret = ctdb_ltdb_unlock(ctdb_db, key);
675                         if (ret != 0) {
676                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
677                         }
678                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
679                         talloc_free(data.dptr);
680                         return;
681                 }
682         }
683
684         /* Dont do READONLY if we don't have a tracking database */
685         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db_readonly(ctdb_db)) {
686                 c->flags &= ~CTDB_WANT_READONLY;
687         }
688
689         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
690                 header.flags &= ~CTDB_REC_RO_FLAGS;
691                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
692                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
693                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
694                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
695                 }
696                 /* and clear out the tracking data */
697                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
698                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
699                 }
700         }
701
702         /* if we are revoking, we must defer all other calls until the revoke
703          * had completed.
704          */
705         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
706                 talloc_free(data.dptr);
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
710                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
711                 }
712                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
713                 return;
714         }
715
716         if ((header.dmaster == ctdb->pnn)
717         && (!(c->flags & CTDB_WANT_READONLY))
718         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
719                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
720                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
722                 }
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724
725                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
726                         ctdb_fatal(ctdb, "Failed to start record revoke");
727                 }
728                 talloc_free(data.dptr);
729
730                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
731                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
732                 }
733
734                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
735                 return;
736         }               
737
738         dstate = talloc(client, struct daemon_call_state);
739         if (dstate == NULL) {
740                 ret = ctdb_ltdb_unlock(ctdb_db, key);
741                 if (ret != 0) {
742                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
743                 }
744
745                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
746                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
747                 return;
748         }
749         dstate->start_time = timeval_current();
750         dstate->client = client;
751         dstate->reqid  = c->hdr.reqid;
752         talloc_steal(dstate, data.dptr);
753
754         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
755         if (call == NULL) {
756                 ret = ctdb_ltdb_unlock(ctdb_db, key);
757                 if (ret != 0) {
758                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
759                 }
760
761                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
762                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
763                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
764                 return;
765         }
766
767         dstate->readonly_fetch = 0;
768         call->call_id = c->callid;
769         call->key = key;
770         call->call_data.dptr = c->data + c->keylen;
771         call->call_data.dsize = c->calldatalen;
772         call->flags = c->flags;
773
774         if (c->flags & CTDB_WANT_READONLY) {
775                 /* client wants readonly record, so translate this into a 
776                    fetch with header. remember what the client asked for
777                    so we can remap the reply back to the proper format for
778                    the client in the reply
779                  */
780                 dstate->client_callid = call->call_id;
781                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
782                 dstate->readonly_fetch = 1;
783         }
784
785         if (header.dmaster == ctdb->pnn) {
786                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
787         } else {
788                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
789                 if (ctdb->tunable.fetch_collapse == 1) {
790                         /* This request triggered a remote fetch-lock.
791                            set up a deferral for this key so any additional
792                            fetch-locks are deferred until the current one
793                            finishes.
794                          */
795                         setup_deferred_fetch_locks(ctdb_db, call);
796                 }
797         }
798
799         ret = ctdb_ltdb_unlock(ctdb_db, key);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
802         }
803
804         if (state == NULL) {
805                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
806                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
807                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
808                 return;
809         }
810         talloc_steal(state, dstate);
811         talloc_steal(client, state);
812
813         state->async.fn = daemon_call_from_client_callback;
814         state->async.private_data = dstate;
815 }
816
817
818 static void daemon_request_control_from_client(struct ctdb_client *client, 
819                                                struct ctdb_req_control_old *c);
820
821 /* data contains a packet from the client */
822 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
823 {
824         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
825         TALLOC_CTX *tmp_ctx;
826         struct ctdb_context *ctdb = client->ctdb;
827
828         /* place the packet as a child of a tmp_ctx. We then use
829            talloc_free() below to free it. If any of the calls want
830            to keep it, then they will steal it somewhere else, and the
831            talloc_free() will be a no-op */
832         tmp_ctx = talloc_new(client);
833         talloc_steal(tmp_ctx, hdr);
834
835         if (hdr->ctdb_magic != CTDB_MAGIC) {
836                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
837                 goto done;
838         }
839
840         if (hdr->ctdb_version != CTDB_PROTOCOL) {
841                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
842                 goto done;
843         }
844
845         switch (hdr->operation) {
846         case CTDB_REQ_CALL:
847                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
848                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
849                 break;
850
851         case CTDB_REQ_MESSAGE:
852                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
853                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
854                 break;
855
856         case CTDB_REQ_CONTROL:
857                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
858                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
859                 break;
860
861         default:
862                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
863                          hdr->operation));
864         }
865
866 done:
867         talloc_free(tmp_ctx);
868 }
869
870 /*
871   called when the daemon gets a incoming packet
872  */
873 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
874 {
875         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
876         struct ctdb_req_header *hdr;
877
878         if (cnt == 0) {
879                 talloc_free(client);
880                 return;
881         }
882
883         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
884
885         if (cnt < sizeof(*hdr)) {
886                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
887                                (unsigned)cnt);
888                 return;
889         }
890         hdr = (struct ctdb_req_header *)data;
891         if (cnt != hdr->length) {
892                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
893                                (unsigned)hdr->length, (unsigned)cnt);
894                 return;
895         }
896
897         if (hdr->ctdb_magic != CTDB_MAGIC) {
898                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
899                 return;
900         }
901
902         if (hdr->ctdb_version != CTDB_PROTOCOL) {
903                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
904                 return;
905         }
906
907         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
908                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
909                  hdr->srcnode, hdr->destnode));
910
911         /* it is the responsibility of the incoming packet function to free 'data' */
912         daemon_incoming_packet(client, hdr);
913 }
914
915
916 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
917 {
918         if (client_pid->ctdb->client_pids != NULL) {
919                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
920         }
921
922         return 0;
923 }
924
925
926 static void ctdb_accept_client(struct tevent_context *ev,
927                                struct tevent_fd *fde, uint16_t flags,
928                                void *private_data)
929 {
930         struct sockaddr_un addr;
931         socklen_t len;
932         int fd;
933         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
934         struct ctdb_client *client;
935         struct ctdb_client_pid_list *client_pid;
936         pid_t peer_pid = 0;
937         int ret;
938
939         memset(&addr, 0, sizeof(addr));
940         len = sizeof(addr);
941         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
942         if (fd == -1) {
943                 return;
944         }
945
946         ret = set_blocking(fd, false);
947         if (ret != 0) {
948                 DEBUG(DEBUG_ERR,
949                       (__location__
950                        " failed to set socket non-blocking (%s)\n",
951                        strerror(errno)));
952                 close(fd);
953                 return;
954         }
955
956         set_close_on_exec(fd);
957
958         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
959
960         client = talloc_zero(ctdb, struct ctdb_client);
961         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
962                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
963         }
964
965         client->ctdb = ctdb;
966         client->fd = fd;
967         client->client_id = reqid_new(ctdb->idr, client);
968         client->pid = peer_pid;
969
970         client_pid = talloc(client, struct ctdb_client_pid_list);
971         if (client_pid == NULL) {
972                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
973                 close(fd);
974                 talloc_free(client);
975                 return;
976         }               
977         client_pid->ctdb   = ctdb;
978         client_pid->pid    = peer_pid;
979         client_pid->client = client;
980
981         DLIST_ADD(ctdb->client_pids, client_pid);
982
983         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
984                                          ctdb_daemon_read_cb, client,
985                                          "client-%u", client->pid);
986
987         talloc_set_destructor(client, ctdb_client_destructor);
988         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
989         ctdb->num_clients++;
990 }
991
992
993
994 /*
995   create a unix domain socket and bind it
996   return a file descriptor open on the socket 
997 */
998 static int ux_socket_bind(struct ctdb_context *ctdb)
999 {
1000         struct sockaddr_un addr;
1001         int ret;
1002
1003         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
1004         if (ctdb->daemon.sd == -1) {
1005                 return -1;
1006         }
1007
1008         memset(&addr, 0, sizeof(addr));
1009         addr.sun_family = AF_UNIX;
1010         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
1011
1012         if (! sock_clean(ctdb->daemon.name)) {
1013                 return -1;
1014         }
1015
1016         set_close_on_exec(ctdb->daemon.sd);
1017
1018         ret = set_blocking(ctdb->daemon.sd, false);
1019         if (ret != 0) {
1020                 DEBUG(DEBUG_ERR,
1021                       (__location__
1022                        " failed to set socket non-blocking (%s)\n",
1023                        strerror(errno)));
1024                 goto failed;
1025         }
1026
1027         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1028                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1029                 goto failed;
1030         }
1031
1032         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1033             chmod(ctdb->daemon.name, 0700) != 0) {
1034                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1035                 goto failed;
1036         }
1037
1038
1039         if (listen(ctdb->daemon.sd, 100) != 0) {
1040                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1041                 goto failed;
1042         }
1043
1044         DEBUG(DEBUG_NOTICE, ("Listening to ctdb socket %s\n",
1045                              ctdb->daemon.name));
1046         return 0;
1047
1048 failed:
1049         close(ctdb->daemon.sd);
1050         ctdb->daemon.sd = -1;
1051         return -1;      
1052 }
1053
1054 static void initialise_node_flags (struct ctdb_context *ctdb)
1055 {
1056         if (ctdb->pnn == -1) {
1057                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1058         }
1059
1060         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1061
1062         /* do we start out in DISABLED mode? */
1063         if (ctdb->start_as_disabled != 0) {
1064                 DEBUG(DEBUG_ERR,
1065                       ("This node is configured to start in DISABLED state\n"));
1066                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1067         }
1068         /* do we start out in STOPPED mode? */
1069         if (ctdb->start_as_stopped != 0) {
1070                 DEBUG(DEBUG_ERR,
1071                       ("This node is configured to start in STOPPED state\n"));
1072                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1073         }
1074 }
1075
1076 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1077                                       void *private_data)
1078 {
1079         if (status != 0) {
1080                 ctdb_die(ctdb, "Failed to run setup event");
1081         }
1082         ctdb_run_notification_script(ctdb, "setup");
1083
1084         /* tell all other nodes we've just started up */
1085         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1086                                  0, CTDB_CONTROL_STARTUP, 0,
1087                                  CTDB_CTRL_FLAG_NOREPLY,
1088                                  tdb_null, NULL, NULL);
1089
1090         /* Start the recovery daemon */
1091         if (ctdb_start_recoverd(ctdb) != 0) {
1092                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1093                 exit(11);
1094         }
1095
1096         ctdb_start_periodic_events(ctdb);
1097
1098         ctdb_wait_for_first_recovery(ctdb);
1099 }
1100
1101 static struct timeval tevent_before_wait_ts;
1102 static struct timeval tevent_after_wait_ts;
1103
1104 static void ctdb_tevent_trace_init(void)
1105 {
1106         struct timeval now;
1107
1108         now = timeval_current();
1109
1110         tevent_before_wait_ts = now;
1111         tevent_after_wait_ts = now;
1112 }
1113
1114 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1115                               void *private_data)
1116 {
1117         struct timeval diff;
1118         struct timeval now;
1119         struct ctdb_context *ctdb =
1120                 talloc_get_type(private_data, struct ctdb_context);
1121
1122         if (getpid() != ctdb->ctdbd_pid) {
1123                 return;
1124         }
1125
1126         now = timeval_current();
1127
1128         switch (tp) {
1129         case TEVENT_TRACE_BEFORE_WAIT:
1130                 diff = timeval_until(&tevent_after_wait_ts, &now);
1131                 if (diff.tv_sec > 3) {
1132                         DEBUG(DEBUG_ERR,
1133                               ("Handling event took %ld seconds!\n",
1134                                (long)diff.tv_sec));
1135                 }
1136                 tevent_before_wait_ts = now;
1137                 break;
1138
1139         case TEVENT_TRACE_AFTER_WAIT:
1140                 diff = timeval_until(&tevent_before_wait_ts, &now);
1141                 if (diff.tv_sec > 3) {
1142                         DEBUG(DEBUG_ERR,
1143                               ("No event for %ld seconds!\n",
1144                                (long)diff.tv_sec));
1145                 }
1146                 tevent_after_wait_ts = now;
1147                 break;
1148
1149         default:
1150                 /* Do nothing for future tevent trace points */ ;
1151         }
1152 }
1153
1154 static void ctdb_remove_pidfile(void)
1155 {
1156         TALLOC_FREE(ctdbd_pidfile_ctx);
1157 }
1158
1159 static void ctdb_create_pidfile(TALLOC_CTX *mem_ctx)
1160 {
1161         if (ctdbd_pidfile != NULL) {
1162                 int ret = pidfile_context_create(mem_ctx, ctdbd_pidfile,
1163                                                  &ctdbd_pidfile_ctx);
1164                 if (ret != 0) {
1165                         DEBUG(DEBUG_ERR,
1166                               ("Failed to create PID file %s\n",
1167                                ctdbd_pidfile));
1168                         exit(11);
1169                 }
1170
1171                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1172                 atexit(ctdb_remove_pidfile);
1173         }
1174 }
1175
1176 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1177 {
1178         int i, j, count;
1179
1180         /* initialize the vnn mapping table, skipping any deleted nodes */
1181         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1182         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1183
1184         count = 0;
1185         for (i = 0; i < ctdb->num_nodes; i++) {
1186                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1187                         count++;
1188                 }
1189         }
1190
1191         ctdb->vnn_map->generation = INVALID_GENERATION;
1192         ctdb->vnn_map->size = count;
1193         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1194         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1195
1196         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1197                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1198                         continue;
1199                 }
1200                 ctdb->vnn_map->map[j] = i;
1201                 j++;
1202         }
1203 }
1204
1205 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1206 {
1207         int nodeid;
1208
1209         if (ctdb->address == NULL) {
1210                 ctdb_fatal(ctdb,
1211                            "Can not determine PNN - node address is not set\n");
1212         }
1213
1214         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1215         if (nodeid == -1) {
1216                 ctdb_fatal(ctdb,
1217                            "Can not determine PNN - node address not found in node list\n");
1218         }
1219
1220         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1221         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1222 }
1223
1224 /*
1225   start the protocol going as a daemon
1226 */
1227 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1228 {
1229         int res, ret = -1;
1230         struct tevent_fd *fde;
1231
1232         become_daemon(do_fork, false, false);
1233
1234         ignore_signal(SIGPIPE);
1235         ignore_signal(SIGUSR1);
1236
1237         ctdb->ctdbd_pid = getpid();
1238         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1239                           ctdb_version_string, ctdb->ctdbd_pid));
1240         ctdb_create_pidfile(ctdb);
1241
1242         /* create a unix domain stream socket to listen to */
1243         res = ux_socket_bind(ctdb);
1244         if (res!=0) {
1245                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1246                 exit(10);
1247         }
1248
1249         /* Make sure we log something when the daemon terminates.
1250          * This must be the first exit handler to run (so the last to
1251          * be registered.
1252          */
1253         __ctdbd_pid = getpid();
1254         atexit(print_exit_message);
1255
1256         if (ctdb->do_setsched) {
1257                 /* try to set us up as realtime */
1258                 if (!set_scheduler()) {
1259                         exit(1);
1260                 }
1261                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1262         }
1263
1264         ctdb->ev = tevent_context_init(NULL);
1265         if (ctdb->ev == NULL) {
1266                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1267                 exit(1);
1268         }
1269         tevent_loop_allow_nesting(ctdb->ev);
1270         ctdb_tevent_trace_init();
1271         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1272
1273         /* set up a handler to pick up sigchld */
1274         if (ctdb_init_sigchld(ctdb) == NULL) {
1275                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1276                 exit(1);
1277         }
1278
1279         if (do_fork) {
1280                 ctdb_set_child_logging(ctdb);
1281         }
1282
1283         TALLOC_FREE(ctdb->srv);
1284         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1285                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1286                 exit(1);
1287         }
1288
1289         TALLOC_FREE(ctdb->tunnels);
1290         if (srvid_init(ctdb, &ctdb->tunnels) != 0) {
1291                 DEBUG(DEBUG_ERR, ("Failed to setup tunnels context\n"));
1292                 exit(1);
1293         }
1294
1295         /* initialize statistics collection */
1296         ctdb_statistics_init(ctdb);
1297
1298         /* force initial recovery for election */
1299         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1300
1301         if (ctdb_start_eventd(ctdb) != 0) {
1302                 DEBUG(DEBUG_ERR, ("Failed to start event daemon\n"));
1303                 exit(1);
1304         }
1305
1306         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1307         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1308         if (ret != 0) {
1309                 ctdb_die(ctdb, "Failed to run init event\n");
1310         }
1311         ctdb_run_notification_script(ctdb, "init");
1312
1313         if (strcmp(ctdb->transport, "tcp") == 0) {
1314                 ret = ctdb_tcp_init(ctdb);
1315         }
1316 #ifdef USE_INFINIBAND
1317         if (strcmp(ctdb->transport, "ib") == 0) {
1318                 ret = ctdb_ibw_init(ctdb);
1319         }
1320 #endif
1321         if (ret != 0) {
1322                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1323                 return -1;
1324         }
1325
1326         if (ctdb->methods == NULL) {
1327                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1328                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1329         }
1330
1331         /* Initialise the transport.  This sets the node address if it
1332          * was not set via the command-line. */
1333         if (ctdb->methods->initialise(ctdb) != 0) {
1334                 ctdb_fatal(ctdb, "transport failed to initialise");
1335         }
1336
1337         ctdb_set_my_pnn(ctdb);
1338
1339         initialise_node_flags(ctdb);
1340
1341         if (ctdb->public_addresses_file) {
1342                 ret = ctdb_set_public_addresses(ctdb, true);
1343                 if (ret == -1) {
1344                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1345                         exit(1);
1346                 }
1347         }
1348
1349         ctdb_initialise_vnn_map(ctdb);
1350
1351         /* attach to existing databases */
1352         if (ctdb_attach_databases(ctdb) != 0) {
1353                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1354         }
1355
1356         /* start frozen, then let the first election sort things out */
1357         if (!ctdb_blocking_freeze(ctdb)) {
1358                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1359         }
1360
1361         /* now start accepting clients, only can do this once frozen */
1362         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1363                             ctdb_accept_client, ctdb);
1364         if (fde == NULL) {
1365                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1366         }
1367         tevent_fd_set_auto_close(fde);
1368
1369         /* Start the transport */
1370         if (ctdb->methods->start(ctdb) != 0) {
1371                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1372                 ctdb_fatal(ctdb, "transport failed to start");
1373         }
1374
1375         /* Recovery daemon and timed events are started from the
1376          * callback, only after the setup event completes
1377          * successfully.
1378          */
1379         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1380         ret = ctdb_event_script_callback(ctdb,
1381                                          ctdb,
1382                                          ctdb_setup_event_callback,
1383                                          ctdb,
1384                                          CTDB_EVENT_SETUP,
1385                                          "%s",
1386                                          "");
1387         if (ret != 0) {
1388                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1389                 exit(1);
1390         }
1391
1392         lockdown_memory(ctdb->valgrinding);
1393
1394         /* go into a wait loop to allow other nodes to complete */
1395         tevent_loop_wait(ctdb->ev);
1396
1397         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1398         exit(1);
1399 }
1400
1401 /*
1402   allocate a packet for use in daemon<->daemon communication
1403  */
1404 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1405                                                  TALLOC_CTX *mem_ctx, 
1406                                                  enum ctdb_operation operation, 
1407                                                  size_t length, size_t slength,
1408                                                  const char *type)
1409 {
1410         int size;
1411         struct ctdb_req_header *hdr;
1412
1413         length = MAX(length, slength);
1414         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1415
1416         if (ctdb->methods == NULL) {
1417                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1418                          operation, (unsigned)length));
1419                 return NULL;
1420         }
1421
1422         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1423         if (hdr == NULL) {
1424                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1425                          operation, (unsigned)length));
1426                 return NULL;
1427         }
1428         talloc_set_name_const(hdr, type);
1429         memset(hdr, 0, slength);
1430         hdr->length       = length;
1431         hdr->operation    = operation;
1432         hdr->ctdb_magic   = CTDB_MAGIC;
1433         hdr->ctdb_version = CTDB_PROTOCOL;
1434         hdr->generation   = ctdb->vnn_map->generation;
1435         hdr->srcnode      = ctdb->pnn;
1436
1437         return hdr;     
1438 }
1439
1440 struct daemon_control_state {
1441         struct daemon_control_state *next, *prev;
1442         struct ctdb_client *client;
1443         struct ctdb_req_control_old *c;
1444         uint32_t reqid;
1445         struct ctdb_node *node;
1446 };
1447
1448 /*
1449   callback when a control reply comes in
1450  */
1451 static void daemon_control_callback(struct ctdb_context *ctdb,
1452                                     int32_t status, TDB_DATA data, 
1453                                     const char *errormsg,
1454                                     void *private_data)
1455 {
1456         struct daemon_control_state *state = talloc_get_type(private_data, 
1457                                                              struct daemon_control_state);
1458         struct ctdb_client *client = state->client;
1459         struct ctdb_reply_control_old *r;
1460         size_t len;
1461         int ret;
1462
1463         /* construct a message to send to the client containing the data */
1464         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1465         if (errormsg) {
1466                 len += strlen(errormsg);
1467         }
1468         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1469                                struct ctdb_reply_control_old);
1470         CTDB_NO_MEMORY_VOID(ctdb, r);
1471
1472         r->hdr.reqid     = state->reqid;
1473         r->status        = status;
1474         r->datalen       = data.dsize;
1475         r->errorlen = 0;
1476         memcpy(&r->data[0], data.dptr, data.dsize);
1477         if (errormsg) {
1478                 r->errorlen = strlen(errormsg);
1479                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1480         }
1481
1482         ret = daemon_queue_send(client, &r->hdr);
1483         if (ret != -1) {
1484                 talloc_free(state);
1485         }
1486 }
1487
1488 /*
1489   fail all pending controls to a disconnected node
1490  */
1491 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1492 {
1493         struct daemon_control_state *state;
1494         while ((state = node->pending_controls)) {
1495                 DLIST_REMOVE(node->pending_controls, state);
1496                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1497                                         "node is disconnected", state);
1498         }
1499 }
1500
1501 /*
1502   destroy a daemon_control_state
1503  */
1504 static int daemon_control_destructor(struct daemon_control_state *state)
1505 {
1506         if (state->node) {
1507                 DLIST_REMOVE(state->node->pending_controls, state);
1508         }
1509         return 0;
1510 }
1511
1512 /*
1513   this is called when the ctdb daemon received a ctdb request control
1514   from a local client over the unix domain socket
1515  */
1516 static void daemon_request_control_from_client(struct ctdb_client *client, 
1517                                                struct ctdb_req_control_old *c)
1518 {
1519         TDB_DATA data;
1520         int res;
1521         struct daemon_control_state *state;
1522         TALLOC_CTX *tmp_ctx = talloc_new(client);
1523
1524         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1525                 c->hdr.destnode = client->ctdb->pnn;
1526         }
1527
1528         state = talloc(client, struct daemon_control_state);
1529         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1530
1531         state->client = client;
1532         state->c = talloc_steal(state, c);
1533         state->reqid = c->hdr.reqid;
1534         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1535                 state->node = client->ctdb->nodes[c->hdr.destnode];
1536                 DLIST_ADD(state->node->pending_controls, state);
1537         } else {
1538                 state->node = NULL;
1539         }
1540
1541         talloc_set_destructor(state, daemon_control_destructor);
1542
1543         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1544                 talloc_steal(tmp_ctx, state);
1545         }
1546         
1547         data.dptr = &c->data[0];
1548         data.dsize = c->datalen;
1549         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1550                                        c->srvid, c->opcode, client->client_id,
1551                                        c->flags,
1552                                        data, daemon_control_callback,
1553                                        state);
1554         if (res != 0) {
1555                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1556                          c->hdr.destnode));
1557         }
1558
1559         talloc_free(tmp_ctx);
1560 }
1561
1562 /*
1563   register a call function
1564 */
1565 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1566                          ctdb_fn_t fn, int id)
1567 {
1568         struct ctdb_registered_call *call;
1569         struct ctdb_db_context *ctdb_db;
1570
1571         ctdb_db = find_ctdb_db(ctdb, db_id);
1572         if (ctdb_db == NULL) {
1573                 return -1;
1574         }
1575
1576         call = talloc(ctdb_db, struct ctdb_registered_call);
1577         call->fn = fn;
1578         call->id = id;
1579
1580         DLIST_ADD(ctdb_db->calls, call);        
1581         return 0;
1582 }
1583
1584
1585
1586 /*
1587   this local messaging handler is ugly, but is needed to prevent
1588   recursion in ctdb_send_message() when the destination node is the
1589   same as the source node
1590  */
1591 struct ctdb_local_message {
1592         struct ctdb_context *ctdb;
1593         uint64_t srvid;
1594         TDB_DATA data;
1595 };
1596
1597 static void ctdb_local_message_trigger(struct tevent_context *ev,
1598                                        struct tevent_timer *te,
1599                                        struct timeval t, void *private_data)
1600 {
1601         struct ctdb_local_message *m = talloc_get_type(
1602                 private_data, struct ctdb_local_message);
1603
1604         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1605         talloc_free(m);
1606 }
1607
1608 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1609 {
1610         struct ctdb_local_message *m;
1611         m = talloc(ctdb, struct ctdb_local_message);
1612         CTDB_NO_MEMORY(ctdb, m);
1613
1614         m->ctdb = ctdb;
1615         m->srvid = srvid;
1616         m->data  = data;
1617         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1618         if (m->data.dptr == NULL) {
1619                 talloc_free(m);
1620                 return -1;
1621         }
1622
1623         /* this needs to be done as an event to prevent recursion */
1624         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1625                          ctdb_local_message_trigger, m);
1626         return 0;
1627 }
1628
1629 /*
1630   send a ctdb message
1631 */
1632 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1633                              uint64_t srvid, TDB_DATA data)
1634 {
1635         struct ctdb_req_message_old *r;
1636         int len;
1637
1638         if (ctdb->methods == NULL) {
1639                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1640                 return -1;
1641         }
1642
1643         /* see if this is a message to ourselves */
1644         if (pnn == ctdb->pnn) {
1645                 return ctdb_local_message(ctdb, srvid, data);
1646         }
1647
1648         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1649         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1650                                     struct ctdb_req_message_old);
1651         CTDB_NO_MEMORY(ctdb, r);
1652
1653         r->hdr.destnode  = pnn;
1654         r->srvid         = srvid;
1655         r->datalen       = data.dsize;
1656         memcpy(&r->data[0], data.dptr, data.dsize);
1657
1658         ctdb_queue_packet(ctdb, &r->hdr);
1659
1660         talloc_free(r);
1661         return 0;
1662 }
1663
1664
1665
1666 struct ctdb_client_notify_list {
1667         struct ctdb_client_notify_list *next, *prev;
1668         struct ctdb_context *ctdb;
1669         uint64_t srvid;
1670         TDB_DATA data;
1671 };
1672
1673
1674 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1675 {
1676         int ret;
1677
1678         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1679
1680         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1681         if (ret != 0) {
1682                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1683         }
1684
1685         return 0;
1686 }
1687
1688 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1689 {
1690         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1691         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1692         struct ctdb_client_notify_list *nl;
1693
1694         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1695
1696         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1697                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1698                 return -1;
1699         }
1700
1701         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1702                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1703                 return -1;
1704         }
1705
1706
1707         if (client == NULL) {
1708                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1709                 return -1;
1710         }
1711
1712         for(nl=client->notify; nl; nl=nl->next) {
1713                 if (nl->srvid == notify->srvid) {
1714                         break;
1715                 }
1716         }
1717         if (nl != NULL) {
1718                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1719                 return -1;
1720         }
1721
1722         nl = talloc(client, struct ctdb_client_notify_list);
1723         CTDB_NO_MEMORY(ctdb, nl);
1724         nl->ctdb       = ctdb;
1725         nl->srvid      = notify->srvid;
1726         nl->data.dsize = notify->len;
1727         nl->data.dptr  = talloc_memdup(nl, notify->notify_data,
1728                                        nl->data.dsize);
1729         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1730         
1731         DLIST_ADD(client->notify, nl);
1732         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1733
1734         return 0;
1735 }
1736
1737 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1738 {
1739         uint64_t srvid = *(uint64_t *)indata.dptr;
1740         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1741         struct ctdb_client_notify_list *nl;
1742
1743         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1744
1745         if (client == NULL) {
1746                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1747                 return -1;
1748         }
1749
1750         for(nl=client->notify; nl; nl=nl->next) {
1751                 if (nl->srvid == srvid) {
1752                         break;
1753                 }
1754         }
1755         if (nl == NULL) {
1756                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1757                 return -1;
1758         }
1759
1760         DLIST_REMOVE(client->notify, nl);
1761         talloc_set_destructor(nl, NULL);
1762         talloc_free(nl);
1763
1764         return 0;
1765 }
1766
1767 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1768 {
1769         struct ctdb_client_pid_list *client_pid;
1770
1771         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1772                 if (client_pid->pid == pid) {
1773                         return client_pid->client;
1774                 }
1775         }
1776         return NULL;
1777 }
1778
1779
1780 /* This control is used by samba when probing if a process (of a samba daemon)
1781    exists on the node.
1782    Samba does this when it needs/wants to check if a subrecord in one of the
1783    databases is still valid, or if it is stale and can be removed.
1784    If the node is in unhealthy or stopped state we just kill of the samba
1785    process holding this sub-record and return to the calling samba that
1786    the process does not exist.
1787    This allows us to forcefully recall subrecords registered by samba processes
1788    on banned and stopped nodes.
1789 */
1790 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1791 {
1792         struct ctdb_client *client;
1793
1794         client = ctdb_find_client_by_pid(ctdb, pid);
1795         if (client == NULL) {
1796                 return -1;
1797         }
1798
1799         if (ctdb->nodes[ctdb->pnn]->flags & NODE_FLAGS_INACTIVE) {
1800                 DEBUG(DEBUG_NOTICE,
1801                       ("Killing client with pid:%d on banned/stopped node\n",
1802                        (int)pid));
1803                 talloc_free(client);
1804                 return -1;
1805         }
1806
1807         return kill(pid, 0);
1808 }
1809
1810 int32_t ctdb_control_check_pid_srvid(struct ctdb_context *ctdb,
1811                                      TDB_DATA indata)
1812 {
1813         struct ctdb_client_pid_list *client_pid;
1814         pid_t pid;
1815         uint64_t srvid;
1816         int ret;
1817
1818         pid = *(pid_t *)indata.dptr;
1819         srvid = *(uint64_t *)(indata.dptr + sizeof(pid_t));
1820
1821         for (client_pid = ctdb->client_pids;
1822              client_pid != NULL;
1823              client_pid = client_pid->next) {
1824                 if (client_pid->pid == pid) {
1825                         ret = srvid_exists(ctdb->srv, srvid,
1826                                            client_pid->client);
1827                         if (ret == 0) {
1828                                 return 0;
1829                         }
1830                 }
1831         }
1832
1833         return -1;
1834 }
1835
1836 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1837 {
1838         struct ctdb_node_map_old *node_map = NULL;
1839
1840         CHECK_CONTROL_DATA_SIZE(0);
1841
1842         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1843         if (node_map == NULL) {
1844                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1845                 return -1;
1846         }
1847
1848         outdata->dptr  = (unsigned char *)node_map;
1849         outdata->dsize = talloc_get_size(outdata->dptr);
1850
1851         return 0;
1852 }
1853
1854 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1855 {
1856         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1857                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1858                 return;
1859         }
1860
1861         DEBUG(DEBUG_ERR,("Shutdown sequence commencing.\n"));
1862         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1863         ctdb_stop_recoverd(ctdb);
1864         ctdb_stop_keepalive(ctdb);
1865         ctdb_stop_monitoring(ctdb);
1866         ctdb_release_all_ips(ctdb);
1867         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1868         ctdb_stop_eventd(ctdb);
1869         if (ctdb->methods != NULL && ctdb->methods->shutdown != NULL) {
1870                 ctdb->methods->shutdown(ctdb);
1871         }
1872
1873         DEBUG(DEBUG_ERR,("Shutdown sequence complete, exiting.\n"));
1874         exit(exit_code);
1875 }
1876
1877 /* When forking the main daemon and the child process needs to connect
1878  * back to the daemon as a client process, this function can be used
1879  * to change the ctdb context from daemon into client mode.  The child
1880  * process must be created using ctdb_fork() and not fork() -
1881  * ctdb_fork() does some necessary housekeeping.
1882  */
1883 int switch_from_server_to_client(struct ctdb_context *ctdb)
1884 {
1885         int ret;
1886
1887         /* get a new event context */
1888         ctdb->ev = tevent_context_init(ctdb);
1889         if (ctdb->ev == NULL) {
1890                 DEBUG(DEBUG_ALERT,("tevent_context_init() failed\n"));
1891                 exit(1);
1892         }
1893         tevent_loop_allow_nesting(ctdb->ev);
1894
1895         /* Connect to main CTDB daemon */
1896         ret = ctdb_socket_connect(ctdb);
1897         if (ret != 0) {
1898                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1899                 return -1;
1900         }
1901
1902         ctdb->can_send_controls = true;
1903
1904         return 0;
1905 }