Merge commit 'ronnie/master'
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
33
34 /*
35   handler for when a node changes its flags
36 */
37 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
38                                 TDB_DATA data, void *private_data)
39 {
40         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
41
42         if (data.dsize != sizeof(*c) || !ctdb_validate_pnn(ctdb, c->pnn)) {
43                 DEBUG(DEBUG_CRIT,(__location__ "Invalid data in ctdb_node_flag_change\n"));
44                 return;
45         }
46
47         if (!ctdb_validate_pnn(ctdb, c->pnn)) {
48                 DEBUG(DEBUG_CRIT,("Bad pnn %u in flag_change_handler\n", c->pnn));
49                 return;
50         }
51
52         /* don't get the disconnected flag from the other node */
53         ctdb->nodes[c->pnn]->flags = 
54                 (ctdb->nodes[c->pnn]->flags&NODE_FLAGS_DISCONNECTED) 
55                 | (c->new_flags & ~NODE_FLAGS_DISCONNECTED);    
56         DEBUG(DEBUG_INFO,("Node flags for node %u are now 0x%x\n", c->pnn, ctdb->nodes[c->pnn]->flags));
57
58         /* make sure we don't hold any IPs when we shouldn't */
59         if (c->pnn == ctdb->pnn &&
60             (ctdb->nodes[c->pnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
61                 ctdb_release_all_ips(ctdb);
62         }
63 }
64
65 static void print_exit_message(void)
66 {
67         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
68 }
69
70
71 /* called when the "startup" event script has finished */
72 static void ctdb_start_transport(struct ctdb_context *ctdb)
73 {
74         if (ctdb->methods == NULL) {
75                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
76                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
77         }
78
79         /* start the transport running */
80         if (ctdb->methods->start(ctdb) != 0) {
81                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
82                 ctdb_fatal(ctdb, "transport failed to start");
83         }
84
85         /* start the recovery daemon process */
86         if (ctdb_start_recoverd(ctdb) != 0) {
87                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
88                 exit(11);
89         }
90
91         /* Make sure we log something when the daemon terminates */
92         atexit(print_exit_message);
93
94         /* a handler for when nodes are disabled/enabled */
95         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
96                                       flag_change_handler, NULL);
97
98         /* start monitoring for connected/disconnected nodes */
99         ctdb_start_keepalive(ctdb);
100
101         /* start monitoring for node health */
102         ctdb_start_monitoring(ctdb);
103
104         /* start periodic update of tcp tickle lists */
105         ctdb_start_tcp_tickle_update(ctdb);
106 }
107
108 static void block_signal(int signum)
109 {
110         struct sigaction act;
111
112         memset(&act, 0, sizeof(act));
113
114         act.sa_handler = SIG_IGN;
115         sigemptyset(&act.sa_mask);
116         sigaddset(&act.sa_mask, signum);
117         sigaction(signum, &act, NULL);
118 }
119
120
121 /*
122   send a packet to a client
123  */
124 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
125 {
126         client->ctdb->statistics.client_packets_sent++;
127         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
128 }
129
130 /*
131   message handler for when we are in daemon mode. This redirects the message
132   to the right client
133  */
134 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
135                                     TDB_DATA data, void *private_data)
136 {
137         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
138         struct ctdb_req_message *r;
139         int len;
140
141         /* construct a message to send to the client containing the data */
142         len = offsetof(struct ctdb_req_message, data) + data.dsize;
143         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
144                                len, struct ctdb_req_message);
145         CTDB_NO_MEMORY_VOID(ctdb, r);
146
147         talloc_set_name_const(r, "req_message packet");
148
149         r->srvid         = srvid;
150         r->datalen       = data.dsize;
151         memcpy(&r->data[0], data.dptr, data.dsize);
152
153         daemon_queue_send(client, &r->hdr);
154
155         talloc_free(r);
156 }
157                                            
158
159 /*
160   this is called when the ctdb daemon received a ctdb request to 
161   set the srvid from the client
162  */
163 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
164 {
165         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
166         int res;
167         if (client == NULL) {
168                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
169                 return -1;
170         }
171         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
172         if (res != 0) {
173                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
174                          (unsigned long long)srvid));
175         } else {
176                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
177                          (unsigned long long)srvid));
178         }
179
180         /* this is a hack for Samba - we now know the pid of the Samba client */
181         if ((srvid & 0xFFFFFFFF) == srvid &&
182             kill(srvid, 0) == 0) {
183                 client->pid = srvid;
184                 DEBUG(DEBUG_INFO,(__location__ " Registered PID %u for client %u\n",
185                          (unsigned)client->pid, client_id));
186         }
187         return res;
188 }
189
190 /*
191   this is called when the ctdb daemon received a ctdb request to 
192   remove a srvid from the client
193  */
194 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
195 {
196         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
197         if (client == NULL) {
198                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
199                 return -1;
200         }
201         return ctdb_deregister_message_handler(ctdb, srvid, client);
202 }
203
204
205 /*
206   destroy a ctdb_client
207 */
208 static int ctdb_client_destructor(struct ctdb_client *client)
209 {
210         ctdb_takeover_client_destructor_hook(client);
211         ctdb_reqid_remove(client->ctdb, client->client_id);
212         client->ctdb->statistics.num_clients--;
213         return 0;
214 }
215
216
217 /*
218   this is called when the ctdb daemon received a ctdb request message
219   from a local client over the unix domain socket
220  */
221 static void daemon_request_message_from_client(struct ctdb_client *client, 
222                                                struct ctdb_req_message *c)
223 {
224         TDB_DATA data;
225         int res;
226
227         /* maybe the message is for another client on this node */
228         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
229                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
230                 return;
231         }
232
233         /* its for a remote node */
234         data.dptr = &c->data[0];
235         data.dsize = c->datalen;
236         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
237                                        c->srvid, data);
238         if (res != 0) {
239                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
240                          c->hdr.destnode));
241         }
242 }
243
244
245 struct daemon_call_state {
246         struct ctdb_client *client;
247         uint32_t reqid;
248         struct ctdb_call *call;
249         struct timeval start_time;
250 };
251
252 /* 
253    complete a call from a client 
254 */
255 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
256 {
257         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
258                                                            struct daemon_call_state);
259         struct ctdb_reply_call *r;
260         int res;
261         uint32_t length;
262         struct ctdb_client *client = dstate->client;
263
264         talloc_steal(client, dstate);
265         talloc_steal(dstate, dstate->call);
266
267         res = ctdb_daemon_call_recv(state, dstate->call);
268         if (res != 0) {
269                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
270                 client->ctdb->statistics.pending_calls--;
271                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
272                 return;
273         }
274
275         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
276         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
277                                length, struct ctdb_reply_call);
278         if (r == NULL) {
279                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
280                 client->ctdb->statistics.pending_calls--;
281                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
282                 return;
283         }
284         r->hdr.reqid        = dstate->reqid;
285         r->datalen          = dstate->call->reply_data.dsize;
286         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
287
288         res = daemon_queue_send(client, &r->hdr);
289         if (res != 0) {
290                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291         }
292         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
293         talloc_free(dstate);
294         client->ctdb->statistics.pending_calls--;
295 }
296
297 struct ctdb_daemon_packet_wrap {
298         struct ctdb_context *ctdb;
299         uint32_t client_id;
300 };
301
302 /*
303   a wrapper to catch disconnected clients
304  */
305 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
306 {
307         struct ctdb_client *client;
308         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
309                                                             struct ctdb_daemon_packet_wrap);
310         if (w == NULL) {
311                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
312                 return;
313         }
314
315         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
316         if (client == NULL) {
317                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
318                          w->client_id));
319                 talloc_free(w);
320                 return;
321         }
322         talloc_free(w);
323
324         /* process it */
325         daemon_incoming_packet(client, hdr);    
326 }
327
328
329 /*
330   this is called when the ctdb daemon received a ctdb request call
331   from a local client over the unix domain socket
332  */
333 static void daemon_request_call_from_client(struct ctdb_client *client, 
334                                             struct ctdb_req_call *c)
335 {
336         struct ctdb_call_state *state;
337         struct ctdb_db_context *ctdb_db;
338         struct daemon_call_state *dstate;
339         struct ctdb_call *call;
340         struct ctdb_ltdb_header header;
341         TDB_DATA key, data;
342         int ret;
343         struct ctdb_context *ctdb = client->ctdb;
344         struct ctdb_daemon_packet_wrap *w;
345
346         ctdb->statistics.total_calls++;
347         ctdb->statistics.pending_calls++;
348
349         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
350         if (!ctdb_db) {
351                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
352                           c->db_id));
353                 ctdb->statistics.pending_calls--;
354                 return;
355         }
356
357         key.dptr = c->data;
358         key.dsize = c->keylen;
359
360         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
361         CTDB_NO_MEMORY_VOID(ctdb, w);   
362
363         w->ctdb = ctdb;
364         w->client_id = client->client_id;
365
366         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
367                                            (struct ctdb_req_header *)c, &data,
368                                            daemon_incoming_packet_wrap, w, True);
369         if (ret == -2) {
370                 /* will retry later */
371                 ctdb->statistics.pending_calls--;
372                 return;
373         }
374
375         talloc_free(w);
376
377         if (ret != 0) {
378                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
379                 ctdb->statistics.pending_calls--;
380                 return;
381         }
382
383         dstate = talloc(client, struct daemon_call_state);
384         if (dstate == NULL) {
385                 ctdb_ltdb_unlock(ctdb_db, key);
386                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
387                 ctdb->statistics.pending_calls--;
388                 return;
389         }
390         dstate->start_time = timeval_current();
391         dstate->client = client;
392         dstate->reqid  = c->hdr.reqid;
393         talloc_steal(dstate, data.dptr);
394
395         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
396         if (call == NULL) {
397                 ctdb_ltdb_unlock(ctdb_db, key);
398                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
399                 ctdb->statistics.pending_calls--;
400                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
401                 return;
402         }
403
404         call->call_id = c->callid;
405         call->key = key;
406         call->call_data.dptr = c->data + c->keylen;
407         call->call_data.dsize = c->calldatalen;
408         call->flags = c->flags;
409
410         if (header.dmaster == ctdb->pnn) {
411                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
412         } else {
413                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
414         }
415
416         ctdb_ltdb_unlock(ctdb_db, key);
417
418         if (state == NULL) {
419                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
420                 ctdb->statistics.pending_calls--;
421                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
422                 return;
423         }
424         talloc_steal(state, dstate);
425         talloc_steal(client, state);
426
427         state->async.fn = daemon_call_from_client_callback;
428         state->async.private_data = dstate;
429 }
430
431
432 static void daemon_request_control_from_client(struct ctdb_client *client, 
433                                                struct ctdb_req_control *c);
434
435 /* data contains a packet from the client */
436 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
437 {
438         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
439         TALLOC_CTX *tmp_ctx;
440         struct ctdb_context *ctdb = client->ctdb;
441
442         /* place the packet as a child of a tmp_ctx. We then use
443            talloc_free() below to free it. If any of the calls want
444            to keep it, then they will steal it somewhere else, and the
445            talloc_free() will be a no-op */
446         tmp_ctx = talloc_new(client);
447         talloc_steal(tmp_ctx, hdr);
448
449         if (hdr->ctdb_magic != CTDB_MAGIC) {
450                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
451                 goto done;
452         }
453
454         if (hdr->ctdb_version != CTDB_VERSION) {
455                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
456                 goto done;
457         }
458
459         switch (hdr->operation) {
460         case CTDB_REQ_CALL:
461                 ctdb->statistics.client.req_call++;
462                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
463                 break;
464
465         case CTDB_REQ_MESSAGE:
466                 ctdb->statistics.client.req_message++;
467                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
468                 break;
469
470         case CTDB_REQ_CONTROL:
471                 ctdb->statistics.client.req_control++;
472                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
473                 break;
474
475         default:
476                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
477                          hdr->operation));
478         }
479
480 done:
481         talloc_free(tmp_ctx);
482 }
483
484 /*
485   called when the daemon gets a incoming packet
486  */
487 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
488 {
489         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
490         struct ctdb_req_header *hdr;
491
492         if (cnt == 0) {
493                 talloc_free(client);
494                 return;
495         }
496
497         client->ctdb->statistics.client_packets_recv++;
498
499         if (cnt < sizeof(*hdr)) {
500                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
501                                (unsigned)cnt);
502                 return;
503         }
504         hdr = (struct ctdb_req_header *)data;
505         if (cnt != hdr->length) {
506                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
507                                (unsigned)hdr->length, (unsigned)cnt);
508                 return;
509         }
510
511         if (hdr->ctdb_magic != CTDB_MAGIC) {
512                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
513                 return;
514         }
515
516         if (hdr->ctdb_version != CTDB_VERSION) {
517                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
518                 return;
519         }
520
521         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
522                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
523                  hdr->srcnode, hdr->destnode));
524
525         /* it is the responsibility of the incoming packet function to free 'data' */
526         daemon_incoming_packet(client, hdr);
527 }
528
529 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
530                          uint16_t flags, void *private_data)
531 {
532         struct sockaddr_in addr;
533         socklen_t len;
534         int fd;
535         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
536         struct ctdb_client *client;
537 #ifdef _AIX
538         struct peercred_struct cr;
539         socklen_t crl = sizeof(struct peercred_struct);
540 #else
541         struct ucred cr;
542         socklen_t crl = sizeof(struct ucred);
543 #endif
544
545         memset(&addr, 0, sizeof(addr));
546         len = sizeof(addr);
547         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
548         if (fd == -1) {
549                 return;
550         }
551
552         set_nonblocking(fd);
553         set_close_on_exec(fd);
554
555         client = talloc_zero(ctdb, struct ctdb_client);
556 #ifdef _AIX
557         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
558 #else
559         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
560 #endif
561                 talloc_asprintf(client, "struct ctdb_client: pid:%u", (unsigned)cr.pid);
562         }
563
564         client->ctdb = ctdb;
565         client->fd = fd;
566         client->client_id = ctdb_reqid_new(ctdb, client);
567         ctdb->statistics.num_clients++;
568
569         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
570                                          ctdb_daemon_read_cb, client);
571
572         talloc_set_destructor(client, ctdb_client_destructor);
573 }
574
575
576
577 /*
578   create a unix domain socket and bind it
579   return a file descriptor open on the socket 
580 */
581 static int ux_socket_bind(struct ctdb_context *ctdb)
582 {
583         struct sockaddr_un addr;
584
585         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
586         if (ctdb->daemon.sd == -1) {
587                 return -1;
588         }
589
590         set_close_on_exec(ctdb->daemon.sd);
591         set_nonblocking(ctdb->daemon.sd);
592
593         memset(&addr, 0, sizeof(addr));
594         addr.sun_family = AF_UNIX;
595         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
596
597         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
598                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
599                 goto failed;
600         }       
601
602         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
603             chmod(ctdb->daemon.name, 0700) != 0) {
604                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
605                 goto failed;
606         } 
607
608
609         if (listen(ctdb->daemon.sd, 10) != 0) {
610                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
611                 goto failed;
612         }
613
614         return 0;
615
616 failed:
617         close(ctdb->daemon.sd);
618         ctdb->daemon.sd = -1;
619         return -1;      
620 }
621
622 /*
623   delete the socket on exit - called on destruction of autofree context
624  */
625 static int unlink_destructor(const char *name)
626 {
627         unlink(name);
628         return 0;
629 }
630
631 static void sig_child_handler(struct event_context *ev,
632         struct signal_event *se, int signum, int count,
633         void *dont_care, 
634         void *private_data)
635 {
636 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
637         int status;
638         pid_t pid = -1;
639
640         while (pid != 0) {
641                 pid = waitpid(-1, &status, WNOHANG);
642                 if (pid == -1) {
643                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
644                         return;
645                 }
646                 if (pid > 0) {
647                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
648                 }
649         }
650 }
651
652 /*
653   start the protocol going as a daemon
654 */
655 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
656 {
657         int res, ret = -1;
658         struct fd_event *fde;
659         const char *domain_socket_name;
660         struct signal_event *se;
661
662         /* get rid of any old sockets */
663         unlink(ctdb->daemon.name);
664
665         /* create a unix domain stream socket to listen to */
666         res = ux_socket_bind(ctdb);
667         if (res!=0) {
668                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
669                 exit(10);
670         }
671
672         if (do_fork && fork()) {
673                 return 0;
674         }
675
676         tdb_reopen_all(False);
677
678         if (do_fork) {
679                 setsid();
680                 close(0);
681                 if (open("/dev/null", O_RDONLY) != 0) {
682                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
683                         exit(11);
684                 }
685         }
686         block_signal(SIGPIPE);
687
688         if (ctdb->do_setsched) {
689                 /* try to set us up as realtime */
690                 ctdb_set_scheduler(ctdb);
691         }
692
693         /* ensure the socket is deleted on exit of the daemon */
694         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
695         talloc_set_destructor(domain_socket_name, unlink_destructor);   
696
697         ctdb->ev = event_context_init(NULL);
698
699         ctdb_set_child_logging(ctdb);
700
701         /* force initial recovery for election */
702         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
703
704         if (strcmp(ctdb->transport, "tcp") == 0) {
705                 int ctdb_tcp_init(struct ctdb_context *);
706                 ret = ctdb_tcp_init(ctdb);
707         }
708 #ifdef USE_INFINIBAND
709         if (strcmp(ctdb->transport, "ib") == 0) {
710                 int ctdb_ibw_init(struct ctdb_context *);
711                 ret = ctdb_ibw_init(ctdb);
712         }
713 #endif
714         if (ret != 0) {
715                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
716                 return -1;
717         }
718
719         if (ctdb->methods == NULL) {
720                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
721                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
722         }
723
724         /* initialise the transport  */
725         if (ctdb->methods->initialise(ctdb) != 0) {
726                 ctdb_fatal(ctdb, "transport failed to initialise");
727         }
728
729         /* attach to any existing persistent databases */
730         if (ctdb_attach_persistent(ctdb) != 0) {
731                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
732         }
733
734         /* start frozen, then let the first election sort things out */
735         if (!ctdb_blocking_freeze(ctdb)) {
736                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
737         }
738
739         /* now start accepting clients, only can do this once frozen */
740         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
741                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
742                            ctdb_accept_client, ctdb);
743
744         /* tell all other nodes we've just started up */
745         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
746                                  0, CTDB_CONTROL_STARTUP, 0,
747                                  CTDB_CTRL_FLAG_NOREPLY,
748                                  tdb_null, NULL, NULL);
749
750         /* release any IPs we hold from previous runs of the daemon */
751         ctdb_release_all_ips(ctdb);
752
753         /* start the transport going */
754         ctdb_start_transport(ctdb);
755
756         /* set up a handler to pick up sigchld */
757         se = event_add_signal(ctdb->ev, ctdb,
758                                      SIGCHLD, 0,
759                                      sig_child_handler,
760                                      ctdb);
761         if (se == NULL) {
762                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
763                 exit(1);
764         }
765           
766         /* go into a wait loop to allow other nodes to complete */
767         event_loop_wait(ctdb->ev);
768
769         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
770         exit(1);
771 }
772
773 /*
774   allocate a packet for use in daemon<->daemon communication
775  */
776 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
777                                                  TALLOC_CTX *mem_ctx, 
778                                                  enum ctdb_operation operation, 
779                                                  size_t length, size_t slength,
780                                                  const char *type)
781 {
782         int size;
783         struct ctdb_req_header *hdr;
784
785         length = MAX(length, slength);
786         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
787
788         if (ctdb->methods == NULL) {
789                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
790                          operation, (unsigned)length));
791                 return NULL;
792         }
793
794         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
795         if (hdr == NULL) {
796                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
797                          operation, (unsigned)length));
798                 return NULL;
799         }
800         talloc_set_name_const(hdr, type);
801         memset(hdr, 0, slength);
802         hdr->length       = length;
803         hdr->operation    = operation;
804         hdr->ctdb_magic   = CTDB_MAGIC;
805         hdr->ctdb_version = CTDB_VERSION;
806         hdr->generation   = ctdb->vnn_map->generation;
807         hdr->srcnode      = ctdb->pnn;
808
809         return hdr;     
810 }
811
812 struct daemon_control_state {
813         struct daemon_control_state *next, *prev;
814         struct ctdb_client *client;
815         struct ctdb_req_control *c;
816         uint32_t reqid;
817         struct ctdb_node *node;
818 };
819
820 /*
821   callback when a control reply comes in
822  */
823 static void daemon_control_callback(struct ctdb_context *ctdb,
824                                     int32_t status, TDB_DATA data, 
825                                     const char *errormsg,
826                                     void *private_data)
827 {
828         struct daemon_control_state *state = talloc_get_type(private_data, 
829                                                              struct daemon_control_state);
830         struct ctdb_client *client = state->client;
831         struct ctdb_reply_control *r;
832         size_t len;
833
834         /* construct a message to send to the client containing the data */
835         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
836         if (errormsg) {
837                 len += strlen(errormsg);
838         }
839         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
840                                struct ctdb_reply_control);
841         CTDB_NO_MEMORY_VOID(ctdb, r);
842
843         r->hdr.reqid     = state->reqid;
844         r->status        = status;
845         r->datalen       = data.dsize;
846         r->errorlen = 0;
847         memcpy(&r->data[0], data.dptr, data.dsize);
848         if (errormsg) {
849                 r->errorlen = strlen(errormsg);
850                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
851         }
852
853         daemon_queue_send(client, &r->hdr);
854
855         talloc_free(state);
856 }
857
858 /*
859   fail all pending controls to a disconnected node
860  */
861 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
862 {
863         struct daemon_control_state *state;
864         while ((state = node->pending_controls)) {
865                 DLIST_REMOVE(node->pending_controls, state);
866                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
867                                         "node is disconnected", state);
868         }
869 }
870
871 /*
872   destroy a daemon_control_state
873  */
874 static int daemon_control_destructor(struct daemon_control_state *state)
875 {
876         if (state->node) {
877                 DLIST_REMOVE(state->node->pending_controls, state);
878         }
879         return 0;
880 }
881
882 /*
883   this is called when the ctdb daemon received a ctdb request control
884   from a local client over the unix domain socket
885  */
886 static void daemon_request_control_from_client(struct ctdb_client *client, 
887                                                struct ctdb_req_control *c)
888 {
889         TDB_DATA data;
890         int res;
891         struct daemon_control_state *state;
892         TALLOC_CTX *tmp_ctx = talloc_new(client);
893
894         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
895                 c->hdr.destnode = client->ctdb->pnn;
896         }
897
898         state = talloc(client, struct daemon_control_state);
899         CTDB_NO_MEMORY_VOID(client->ctdb, state);
900
901         state->client = client;
902         state->c = talloc_steal(state, c);
903         state->reqid = c->hdr.reqid;
904         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
905                 state->node = client->ctdb->nodes[c->hdr.destnode];
906                 DLIST_ADD(state->node->pending_controls, state);
907         } else {
908                 state->node = NULL;
909         }
910
911         talloc_set_destructor(state, daemon_control_destructor);
912
913         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
914                 talloc_steal(tmp_ctx, state);
915         }
916         
917         data.dptr = &c->data[0];
918         data.dsize = c->datalen;
919         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
920                                        c->srvid, c->opcode, client->client_id,
921                                        c->flags,
922                                        data, daemon_control_callback,
923                                        state);
924         if (res != 0) {
925                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
926                          c->hdr.destnode));
927         }
928
929         talloc_free(tmp_ctx);
930 }
931
932 /*
933   register a call function
934 */
935 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
936                          ctdb_fn_t fn, int id)
937 {
938         struct ctdb_registered_call *call;
939         struct ctdb_db_context *ctdb_db;
940
941         ctdb_db = find_ctdb_db(ctdb, db_id);
942         if (ctdb_db == NULL) {
943                 return -1;
944         }
945
946         call = talloc(ctdb_db, struct ctdb_registered_call);
947         call->fn = fn;
948         call->id = id;
949
950         DLIST_ADD(ctdb_db->calls, call);        
951         return 0;
952 }
953
954
955
956 /*
957   this local messaging handler is ugly, but is needed to prevent
958   recursion in ctdb_send_message() when the destination node is the
959   same as the source node
960  */
961 struct ctdb_local_message {
962         struct ctdb_context *ctdb;
963         uint64_t srvid;
964         TDB_DATA data;
965 };
966
967 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
968                                        struct timeval t, void *private_data)
969 {
970         struct ctdb_local_message *m = talloc_get_type(private_data, 
971                                                        struct ctdb_local_message);
972         int res;
973
974         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
975         if (res != 0) {
976                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
977                           (unsigned long long)m->srvid));
978         }
979         talloc_free(m);
980 }
981
982 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
983 {
984         struct ctdb_local_message *m;
985         m = talloc(ctdb, struct ctdb_local_message);
986         CTDB_NO_MEMORY(ctdb, m);
987
988         m->ctdb = ctdb;
989         m->srvid = srvid;
990         m->data  = data;
991         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
992         if (m->data.dptr == NULL) {
993                 talloc_free(m);
994                 return -1;
995         }
996
997         /* this needs to be done as an event to prevent recursion */
998         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
999         return 0;
1000 }
1001
1002 /*
1003   send a ctdb message
1004 */
1005 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1006                              uint64_t srvid, TDB_DATA data)
1007 {
1008         struct ctdb_req_message *r;
1009         int len;
1010
1011         /* see if this is a message to ourselves */
1012         if (pnn == ctdb->pnn) {
1013                 return ctdb_local_message(ctdb, srvid, data);
1014         }
1015
1016         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1017         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1018                                     struct ctdb_req_message);
1019         CTDB_NO_MEMORY(ctdb, r);
1020
1021         r->hdr.destnode  = pnn;
1022         r->srvid         = srvid;
1023         r->datalen       = data.dsize;
1024         memcpy(&r->data[0], data.dptr, data.dsize);
1025
1026         ctdb_queue_packet(ctdb, &r->hdr);
1027
1028         talloc_free(r);
1029         return 0;
1030 }
1031