Revert "waitpid() can block if it takes a long time before the child terminates"
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
33
34 /*
35   handler for when a node changes its flags
36 */
37 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
38                                 TDB_DATA data, void *private_data)
39 {
40         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
41
42         if (data.dsize != sizeof(*c) || !ctdb_validate_pnn(ctdb, c->pnn)) {
43                 DEBUG(DEBUG_CRIT,(__location__ "Invalid data in ctdb_node_flag_change\n"));
44                 return;
45         }
46
47         if (!ctdb_validate_pnn(ctdb, c->pnn)) {
48                 DEBUG(DEBUG_CRIT,("Bad pnn %u in flag_change_handler\n", c->pnn));
49                 return;
50         }
51
52         /* don't get the disconnected flag from the other node */
53         ctdb->nodes[c->pnn]->flags = 
54                 (ctdb->nodes[c->pnn]->flags&NODE_FLAGS_DISCONNECTED) 
55                 | (c->new_flags & ~NODE_FLAGS_DISCONNECTED);    
56         DEBUG(DEBUG_INFO,("Node flags for node %u are now 0x%x\n", c->pnn, ctdb->nodes[c->pnn]->flags));
57
58         /* make sure we don't hold any IPs when we shouldn't */
59         if (c->pnn == ctdb->pnn &&
60             (ctdb->nodes[c->pnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
61                 ctdb_release_all_ips(ctdb);
62         }
63 }
64
65 static void print_exit_message(void)
66 {
67         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
68 }
69
70
71 /* called when the "startup" event script has finished */
72 static void ctdb_start_transport(struct ctdb_context *ctdb)
73 {
74         if (ctdb->methods == NULL) {
75                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
76                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
77         }
78
79         /* start the transport running */
80         if (ctdb->methods->start(ctdb) != 0) {
81                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
82                 ctdb_fatal(ctdb, "transport failed to start");
83         }
84
85         /* start the recovery daemon process */
86         if (ctdb_start_recoverd(ctdb) != 0) {
87                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
88                 exit(11);
89         }
90
91         /* Make sure we log something when the daemon terminates */
92         atexit(print_exit_message);
93
94         /* a handler for when nodes are disabled/enabled */
95         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
96                                       flag_change_handler, NULL);
97
98         /* start monitoring for connected/disconnected nodes */
99         ctdb_start_keepalive(ctdb);
100
101         /* start monitoring for node health */
102         ctdb_start_monitoring(ctdb);
103
104         /* start periodic update of tcp tickle lists */
105         ctdb_start_tcp_tickle_update(ctdb);
106 }
107
108 static void block_signal(int signum)
109 {
110         struct sigaction act;
111
112         memset(&act, 0, sizeof(act));
113
114         act.sa_handler = SIG_IGN;
115         sigemptyset(&act.sa_mask);
116         sigaddset(&act.sa_mask, signum);
117         sigaction(signum, &act, NULL);
118 }
119
120
121 /*
122   send a packet to a client
123  */
124 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
125 {
126         client->ctdb->statistics.client_packets_sent++;
127         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
128 }
129
130 /*
131   message handler for when we are in daemon mode. This redirects the message
132   to the right client
133  */
134 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
135                                     TDB_DATA data, void *private_data)
136 {
137         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
138         struct ctdb_req_message *r;
139         int len;
140
141         /* construct a message to send to the client containing the data */
142         len = offsetof(struct ctdb_req_message, data) + data.dsize;
143         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
144                                len, struct ctdb_req_message);
145         CTDB_NO_MEMORY_VOID(ctdb, r);
146
147         talloc_set_name_const(r, "req_message packet");
148
149         r->srvid         = srvid;
150         r->datalen       = data.dsize;
151         memcpy(&r->data[0], data.dptr, data.dsize);
152
153         daemon_queue_send(client, &r->hdr);
154
155         talloc_free(r);
156 }
157                                            
158
159 /*
160   this is called when the ctdb daemon received a ctdb request to 
161   set the srvid from the client
162  */
163 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
164 {
165         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
166         int res;
167         if (client == NULL) {
168                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
169                 return -1;
170         }
171         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
172         if (res != 0) {
173                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
174                          (unsigned long long)srvid));
175         } else {
176                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
177                          (unsigned long long)srvid));
178         }
179
180         /* this is a hack for Samba - we now know the pid of the Samba client */
181         if ((srvid & 0xFFFFFFFF) == srvid &&
182             kill(srvid, 0) == 0) {
183                 client->pid = srvid;
184                 DEBUG(DEBUG_INFO,(__location__ " Registered PID %u for client %u\n",
185                          (unsigned)client->pid, client_id));
186         }
187         return res;
188 }
189
190 /*
191   this is called when the ctdb daemon received a ctdb request to 
192   remove a srvid from the client
193  */
194 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
195 {
196         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
197         if (client == NULL) {
198                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
199                 return -1;
200         }
201         return ctdb_deregister_message_handler(ctdb, srvid, client);
202 }
203
204
205 /*
206   destroy a ctdb_client
207 */
208 static int ctdb_client_destructor(struct ctdb_client *client)
209 {
210         ctdb_takeover_client_destructor_hook(client);
211         ctdb_reqid_remove(client->ctdb, client->client_id);
212         client->ctdb->statistics.num_clients--;
213         return 0;
214 }
215
216
217 /*
218   this is called when the ctdb daemon received a ctdb request message
219   from a local client over the unix domain socket
220  */
221 static void daemon_request_message_from_client(struct ctdb_client *client, 
222                                                struct ctdb_req_message *c)
223 {
224         TDB_DATA data;
225         int res;
226
227         /* maybe the message is for another client on this node */
228         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
229                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
230                 return;
231         }
232
233         /* its for a remote node */
234         data.dptr = &c->data[0];
235         data.dsize = c->datalen;
236         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
237                                        c->srvid, data);
238         if (res != 0) {
239                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
240                          c->hdr.destnode));
241         }
242 }
243
244
245 struct daemon_call_state {
246         struct ctdb_client *client;
247         uint32_t reqid;
248         struct ctdb_call *call;
249         struct timeval start_time;
250 };
251
252 /* 
253    complete a call from a client 
254 */
255 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
256 {
257         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
258                                                            struct daemon_call_state);
259         struct ctdb_reply_call *r;
260         int res;
261         uint32_t length;
262         struct ctdb_client *client = dstate->client;
263
264         talloc_steal(client, dstate);
265         talloc_steal(dstate, dstate->call);
266
267         res = ctdb_daemon_call_recv(state, dstate->call);
268         if (res != 0) {
269                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
270                 client->ctdb->statistics.pending_calls--;
271                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
272                 return;
273         }
274
275         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
276         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
277                                length, struct ctdb_reply_call);
278         if (r == NULL) {
279                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
280                 client->ctdb->statistics.pending_calls--;
281                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
282                 return;
283         }
284         r->hdr.reqid        = dstate->reqid;
285         r->datalen          = dstate->call->reply_data.dsize;
286         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
287
288         res = daemon_queue_send(client, &r->hdr);
289         if (res != 0) {
290                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
291         }
292         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
293         talloc_free(dstate);
294         client->ctdb->statistics.pending_calls--;
295 }
296
297 struct ctdb_daemon_packet_wrap {
298         struct ctdb_context *ctdb;
299         uint32_t client_id;
300 };
301
302 /*
303   a wrapper to catch disconnected clients
304  */
305 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
306 {
307         struct ctdb_client *client;
308         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
309                                                             struct ctdb_daemon_packet_wrap);
310         if (w == NULL) {
311                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
312                 return;
313         }
314
315         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
316         if (client == NULL) {
317                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
318                          w->client_id));
319                 talloc_free(w);
320                 return;
321         }
322         talloc_free(w);
323
324         /* process it */
325         daemon_incoming_packet(client, hdr);    
326 }
327
328
329 /*
330   this is called when the ctdb daemon received a ctdb request call
331   from a local client over the unix domain socket
332  */
333 static void daemon_request_call_from_client(struct ctdb_client *client, 
334                                             struct ctdb_req_call *c)
335 {
336         struct ctdb_call_state *state;
337         struct ctdb_db_context *ctdb_db;
338         struct daemon_call_state *dstate;
339         struct ctdb_call *call;
340         struct ctdb_ltdb_header header;
341         TDB_DATA key, data;
342         int ret;
343         struct ctdb_context *ctdb = client->ctdb;
344         struct ctdb_daemon_packet_wrap *w;
345
346         ctdb->statistics.total_calls++;
347         ctdb->statistics.pending_calls++;
348
349         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
350         if (!ctdb_db) {
351                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
352                           c->db_id));
353                 ctdb->statistics.pending_calls--;
354                 return;
355         }
356
357         key.dptr = c->data;
358         key.dsize = c->keylen;
359
360         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
361         CTDB_NO_MEMORY_VOID(ctdb, w);   
362
363         w->ctdb = ctdb;
364         w->client_id = client->client_id;
365
366         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
367                                            (struct ctdb_req_header *)c, &data,
368                                            daemon_incoming_packet_wrap, w, True);
369         if (ret == -2) {
370                 /* will retry later */
371                 ctdb->statistics.pending_calls--;
372                 return;
373         }
374
375         talloc_free(w);
376
377         if (ret != 0) {
378                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
379                 ctdb->statistics.pending_calls--;
380                 return;
381         }
382
383         dstate = talloc(client, struct daemon_call_state);
384         if (dstate == NULL) {
385                 ctdb_ltdb_unlock(ctdb_db, key);
386                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
387                 ctdb->statistics.pending_calls--;
388                 return;
389         }
390         dstate->start_time = timeval_current();
391         dstate->client = client;
392         dstate->reqid  = c->hdr.reqid;
393         talloc_steal(dstate, data.dptr);
394
395         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
396         if (call == NULL) {
397                 ctdb_ltdb_unlock(ctdb_db, key);
398                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
399                 ctdb->statistics.pending_calls--;
400                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
401                 return;
402         }
403
404         call->call_id = c->callid;
405         call->key = key;
406         call->call_data.dptr = c->data + c->keylen;
407         call->call_data.dsize = c->calldatalen;
408         call->flags = c->flags;
409
410         if (header.dmaster == ctdb->pnn) {
411                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
412         } else {
413                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
414         }
415
416         ctdb_ltdb_unlock(ctdb_db, key);
417
418         if (state == NULL) {
419                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
420                 ctdb->statistics.pending_calls--;
421                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
422                 return;
423         }
424         talloc_steal(state, dstate);
425         talloc_steal(client, state);
426
427         state->async.fn = daemon_call_from_client_callback;
428         state->async.private_data = dstate;
429 }
430
431
432 static void daemon_request_control_from_client(struct ctdb_client *client, 
433                                                struct ctdb_req_control *c);
434
435 /* data contains a packet from the client */
436 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
437 {
438         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
439         TALLOC_CTX *tmp_ctx;
440         struct ctdb_context *ctdb = client->ctdb;
441
442         /* place the packet as a child of a tmp_ctx. We then use
443            talloc_free() below to free it. If any of the calls want
444            to keep it, then they will steal it somewhere else, and the
445            talloc_free() will be a no-op */
446         tmp_ctx = talloc_new(client);
447         talloc_steal(tmp_ctx, hdr);
448
449         if (hdr->ctdb_magic != CTDB_MAGIC) {
450                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
451                 goto done;
452         }
453
454         if (hdr->ctdb_version != CTDB_VERSION) {
455                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
456                 goto done;
457         }
458
459         switch (hdr->operation) {
460         case CTDB_REQ_CALL:
461                 ctdb->statistics.client.req_call++;
462                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
463                 break;
464
465         case CTDB_REQ_MESSAGE:
466                 ctdb->statistics.client.req_message++;
467                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
468                 break;
469
470         case CTDB_REQ_CONTROL:
471                 ctdb->statistics.client.req_control++;
472                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
473                 break;
474
475         default:
476                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
477                          hdr->operation));
478         }
479
480 done:
481         talloc_free(tmp_ctx);
482 }
483
484 /*
485   called when the daemon gets a incoming packet
486  */
487 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
488 {
489         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
490         struct ctdb_req_header *hdr;
491
492         if (cnt == 0) {
493                 talloc_free(client);
494                 return;
495         }
496
497         client->ctdb->statistics.client_packets_recv++;
498
499         if (cnt < sizeof(*hdr)) {
500                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
501                                (unsigned)cnt);
502                 return;
503         }
504         hdr = (struct ctdb_req_header *)data;
505         if (cnt != hdr->length) {
506                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
507                                (unsigned)hdr->length, (unsigned)cnt);
508                 return;
509         }
510
511         if (hdr->ctdb_magic != CTDB_MAGIC) {
512                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
513                 return;
514         }
515
516         if (hdr->ctdb_version != CTDB_VERSION) {
517                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
518                 return;
519         }
520
521         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
522                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
523                  hdr->srcnode, hdr->destnode));
524
525         /* it is the responsibility of the incoming packet function to free 'data' */
526         daemon_incoming_packet(client, hdr);
527 }
528
529 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
530                          uint16_t flags, void *private_data)
531 {
532         struct sockaddr_in addr;
533         socklen_t len;
534         int fd;
535         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
536         struct ctdb_client *client;
537 #ifdef _AIX
538         struct peercred_struct cr;
539         socklen_t crl = sizeof(struct peercred_struct);
540 #else
541         struct ucred cr;
542         socklen_t crl = sizeof(struct ucred);
543 #endif
544
545         memset(&addr, 0, sizeof(addr));
546         len = sizeof(addr);
547         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
548         if (fd == -1) {
549                 return;
550         }
551
552         set_nonblocking(fd);
553         set_close_on_exec(fd);
554
555         client = talloc_zero(ctdb, struct ctdb_client);
556 #ifdef _AIX
557         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
558 #else
559         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
560 #endif
561                 talloc_asprintf(client, "struct ctdb_client: pid:%u", (unsigned)cr.pid);
562         }
563
564         client->ctdb = ctdb;
565         client->fd = fd;
566         client->client_id = ctdb_reqid_new(ctdb, client);
567         ctdb->statistics.num_clients++;
568
569         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
570                                          ctdb_daemon_read_cb, client);
571
572         talloc_set_destructor(client, ctdb_client_destructor);
573 }
574
575
576
577 /*
578   create a unix domain socket and bind it
579   return a file descriptor open on the socket 
580 */
581 static int ux_socket_bind(struct ctdb_context *ctdb)
582 {
583         struct sockaddr_un addr;
584
585         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
586         if (ctdb->daemon.sd == -1) {
587                 return -1;
588         }
589
590         set_close_on_exec(ctdb->daemon.sd);
591         set_nonblocking(ctdb->daemon.sd);
592
593         memset(&addr, 0, sizeof(addr));
594         addr.sun_family = AF_UNIX;
595         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
596
597         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
598                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
599                 goto failed;
600         }       
601
602         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
603             chmod(ctdb->daemon.name, 0700) != 0) {
604                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
605                 goto failed;
606         } 
607
608
609         if (listen(ctdb->daemon.sd, 10) != 0) {
610                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
611                 goto failed;
612         }
613
614         return 0;
615
616 failed:
617         close(ctdb->daemon.sd);
618         ctdb->daemon.sd = -1;
619         return -1;      
620 }
621
622 /*
623   delete the socket on exit - called on destruction of autofree context
624  */
625 static int unlink_destructor(const char *name)
626 {
627         unlink(name);
628         return 0;
629 }
630
631 /*
632   start the protocol going as a daemon
633 */
634 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
635 {
636         int res, ret = -1;
637         struct fd_event *fde;
638         const char *domain_socket_name;
639
640         /* get rid of any old sockets */
641         unlink(ctdb->daemon.name);
642
643         /* create a unix domain stream socket to listen to */
644         res = ux_socket_bind(ctdb);
645         if (res!=0) {
646                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
647                 exit(10);
648         }
649
650         if (do_fork && fork()) {
651                 return 0;
652         }
653
654         tdb_reopen_all(False);
655
656         if (do_fork) {
657                 setsid();
658                 close(0);
659                 if (open("/dev/null", O_RDONLY) != 0) {
660                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
661                         exit(11);
662                 }
663         }
664         block_signal(SIGPIPE);
665
666         if (ctdb->do_setsched) {
667                 /* try to set us up as realtime */
668                 ctdb_set_scheduler(ctdb);
669         }
670
671         /* ensure the socket is deleted on exit of the daemon */
672         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
673         talloc_set_destructor(domain_socket_name, unlink_destructor);   
674
675         ctdb->ev = event_context_init(NULL);
676
677         ctdb_set_child_logging(ctdb);
678
679         /* force initial recovery for election */
680         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
681
682         if (strcmp(ctdb->transport, "tcp") == 0) {
683                 int ctdb_tcp_init(struct ctdb_context *);
684                 ret = ctdb_tcp_init(ctdb);
685         }
686 #ifdef USE_INFINIBAND
687         if (strcmp(ctdb->transport, "ib") == 0) {
688                 int ctdb_ibw_init(struct ctdb_context *);
689                 ret = ctdb_ibw_init(ctdb);
690         }
691 #endif
692         if (ret != 0) {
693                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
694                 return -1;
695         }
696
697         if (ctdb->methods == NULL) {
698                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
699                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
700         }
701
702         /* initialise the transport  */
703         if (ctdb->methods->initialise(ctdb) != 0) {
704                 ctdb_fatal(ctdb, "transport failed to initialise");
705         }
706
707         /* attach to any existing persistent databases */
708         if (ctdb_attach_persistent(ctdb) != 0) {
709                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
710         }
711
712         /* start frozen, then let the first election sort things out */
713         if (!ctdb_blocking_freeze(ctdb)) {
714                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
715         }
716
717         /* now start accepting clients, only can do this once frozen */
718         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
719                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
720                            ctdb_accept_client, ctdb);
721
722         /* tell all other nodes we've just started up */
723         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
724                                  0, CTDB_CONTROL_STARTUP, 0,
725                                  CTDB_CTRL_FLAG_NOREPLY,
726                                  tdb_null, NULL, NULL);
727
728         /* release any IPs we hold from previous runs of the daemon */
729         ctdb_release_all_ips(ctdb);
730
731         /* start the transport going */
732         ctdb_start_transport(ctdb);
733
734         /* go into a wait loop to allow other nodes to complete */
735         event_loop_wait(ctdb->ev);
736
737         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
738         exit(1);
739 }
740
741 /*
742   allocate a packet for use in daemon<->daemon communication
743  */
744 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
745                                                  TALLOC_CTX *mem_ctx, 
746                                                  enum ctdb_operation operation, 
747                                                  size_t length, size_t slength,
748                                                  const char *type)
749 {
750         int size;
751         struct ctdb_req_header *hdr;
752
753         length = MAX(length, slength);
754         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
755
756         if (ctdb->methods == NULL) {
757                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
758                          operation, (unsigned)length));
759                 return NULL;
760         }
761
762         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
763         if (hdr == NULL) {
764                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
765                          operation, (unsigned)length));
766                 return NULL;
767         }
768         talloc_set_name_const(hdr, type);
769         memset(hdr, 0, slength);
770         hdr->length       = length;
771         hdr->operation    = operation;
772         hdr->ctdb_magic   = CTDB_MAGIC;
773         hdr->ctdb_version = CTDB_VERSION;
774         hdr->generation   = ctdb->vnn_map->generation;
775         hdr->srcnode      = ctdb->pnn;
776
777         return hdr;     
778 }
779
780 struct daemon_control_state {
781         struct daemon_control_state *next, *prev;
782         struct ctdb_client *client;
783         struct ctdb_req_control *c;
784         uint32_t reqid;
785         struct ctdb_node *node;
786 };
787
788 /*
789   callback when a control reply comes in
790  */
791 static void daemon_control_callback(struct ctdb_context *ctdb,
792                                     int32_t status, TDB_DATA data, 
793                                     const char *errormsg,
794                                     void *private_data)
795 {
796         struct daemon_control_state *state = talloc_get_type(private_data, 
797                                                              struct daemon_control_state);
798         struct ctdb_client *client = state->client;
799         struct ctdb_reply_control *r;
800         size_t len;
801
802         /* construct a message to send to the client containing the data */
803         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
804         if (errormsg) {
805                 len += strlen(errormsg);
806         }
807         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
808                                struct ctdb_reply_control);
809         CTDB_NO_MEMORY_VOID(ctdb, r);
810
811         r->hdr.reqid     = state->reqid;
812         r->status        = status;
813         r->datalen       = data.dsize;
814         r->errorlen = 0;
815         memcpy(&r->data[0], data.dptr, data.dsize);
816         if (errormsg) {
817                 r->errorlen = strlen(errormsg);
818                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
819         }
820
821         daemon_queue_send(client, &r->hdr);
822
823         talloc_free(state);
824 }
825
826 /*
827   fail all pending controls to a disconnected node
828  */
829 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
830 {
831         struct daemon_control_state *state;
832         while ((state = node->pending_controls)) {
833                 DLIST_REMOVE(node->pending_controls, state);
834                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
835                                         "node is disconnected", state);
836         }
837 }
838
839 /*
840   destroy a daemon_control_state
841  */
842 static int daemon_control_destructor(struct daemon_control_state *state)
843 {
844         if (state->node) {
845                 DLIST_REMOVE(state->node->pending_controls, state);
846         }
847         return 0;
848 }
849
850 /*
851   this is called when the ctdb daemon received a ctdb request control
852   from a local client over the unix domain socket
853  */
854 static void daemon_request_control_from_client(struct ctdb_client *client, 
855                                                struct ctdb_req_control *c)
856 {
857         TDB_DATA data;
858         int res;
859         struct daemon_control_state *state;
860         TALLOC_CTX *tmp_ctx = talloc_new(client);
861
862         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
863                 c->hdr.destnode = client->ctdb->pnn;
864         }
865
866         state = talloc(client, struct daemon_control_state);
867         CTDB_NO_MEMORY_VOID(client->ctdb, state);
868
869         state->client = client;
870         state->c = talloc_steal(state, c);
871         state->reqid = c->hdr.reqid;
872         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
873                 state->node = client->ctdb->nodes[c->hdr.destnode];
874                 DLIST_ADD(state->node->pending_controls, state);
875         } else {
876                 state->node = NULL;
877         }
878
879         talloc_set_destructor(state, daemon_control_destructor);
880
881         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
882                 talloc_steal(tmp_ctx, state);
883         }
884         
885         data.dptr = &c->data[0];
886         data.dsize = c->datalen;
887         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
888                                        c->srvid, c->opcode, client->client_id,
889                                        c->flags,
890                                        data, daemon_control_callback,
891                                        state);
892         if (res != 0) {
893                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
894                          c->hdr.destnode));
895         }
896
897         talloc_free(tmp_ctx);
898 }
899
900 /*
901   register a call function
902 */
903 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
904                          ctdb_fn_t fn, int id)
905 {
906         struct ctdb_registered_call *call;
907         struct ctdb_db_context *ctdb_db;
908
909         ctdb_db = find_ctdb_db(ctdb, db_id);
910         if (ctdb_db == NULL) {
911                 return -1;
912         }
913
914         call = talloc(ctdb_db, struct ctdb_registered_call);
915         call->fn = fn;
916         call->id = id;
917
918         DLIST_ADD(ctdb_db->calls, call);        
919         return 0;
920 }
921
922
923
924 /*
925   this local messaging handler is ugly, but is needed to prevent
926   recursion in ctdb_send_message() when the destination node is the
927   same as the source node
928  */
929 struct ctdb_local_message {
930         struct ctdb_context *ctdb;
931         uint64_t srvid;
932         TDB_DATA data;
933 };
934
935 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
936                                        struct timeval t, void *private_data)
937 {
938         struct ctdb_local_message *m = talloc_get_type(private_data, 
939                                                        struct ctdb_local_message);
940         int res;
941
942         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
943         if (res != 0) {
944                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
945                           (unsigned long long)m->srvid));
946         }
947         talloc_free(m);
948 }
949
950 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
951 {
952         struct ctdb_local_message *m;
953         m = talloc(ctdb, struct ctdb_local_message);
954         CTDB_NO_MEMORY(ctdb, m);
955
956         m->ctdb = ctdb;
957         m->srvid = srvid;
958         m->data  = data;
959         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
960         if (m->data.dptr == NULL) {
961                 talloc_free(m);
962                 return -1;
963         }
964
965         /* this needs to be done as an event to prevent recursion */
966         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
967         return 0;
968 }
969
970 /*
971   send a ctdb message
972 */
973 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
974                              uint64_t srvid, TDB_DATA data)
975 {
976         struct ctdb_req_message *r;
977         int len;
978
979         /* see if this is a message to ourselves */
980         if (pnn == ctdb->pnn) {
981                 return ctdb_local_message(ctdb, srvid, data);
982         }
983
984         len = offsetof(struct ctdb_req_message, data) + data.dsize;
985         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
986                                     struct ctdb_req_message);
987         CTDB_NO_MEMORY(ctdb, r);
988
989         r->hdr.destnode  = pnn;
990         r->srvid         = srvid;
991         r->datalen       = data.dsize;
992         memcpy(&r->data[0], data.dptr, data.dsize);
993
994         ctdb_queue_packet(ctdb, &r->hdr);
995
996         talloc_free(r);
997         return 0;
998 }
999