ctdb: use mlockall, cautiously
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
104                         return 0;
105                 }
106         }
107         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
108 }
109
110 /*
111   message handler for when we are in daemon mode. This redirects the message
112   to the right client
113  */
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
115                                     TDB_DATA data, void *private_data)
116 {
117         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118         struct ctdb_req_message *r;
119         int len;
120
121         /* construct a message to send to the client containing the data */
122         len = offsetof(struct ctdb_req_message, data) + data.dsize;
123         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
124                                len, struct ctdb_req_message);
125         CTDB_NO_MEMORY_VOID(ctdb, r);
126
127         talloc_set_name_const(r, "req_message packet");
128
129         r->srvid         = srvid;
130         r->datalen       = data.dsize;
131         memcpy(&r->data[0], data.dptr, data.dsize);
132
133         daemon_queue_send(client, &r->hdr);
134
135         talloc_free(r);
136 }
137
138 /*
139   this is called when the ctdb daemon received a ctdb request to 
140   set the srvid from the client
141  */
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
143 {
144         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
145         int res;
146         if (client == NULL) {
147                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
148                 return -1;
149         }
150         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
151         if (res != 0) {
152                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
153                          (unsigned long long)srvid));
154         } else {
155                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
156                          (unsigned long long)srvid));
157         }
158
159         return res;
160 }
161
162 /*
163   this is called when the ctdb daemon received a ctdb request to 
164   remove a srvid from the client
165  */
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
167 {
168         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169         if (client == NULL) {
170                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
171                 return -1;
172         }
173         return ctdb_deregister_message_handler(ctdb, srvid, client);
174 }
175
176
177 /*
178   destroy a ctdb_client
179 */
180 static int ctdb_client_destructor(struct ctdb_client *client)
181 {
182         struct ctdb_db_context *ctdb_db;
183
184         ctdb_takeover_client_destructor_hook(client);
185         ctdb_reqid_remove(client->ctdb, client->client_id);
186         if (client->ctdb->statistics.num_clients) {
187                 client->ctdb->statistics.num_clients--;
188         }
189
190         if (client->num_persistent_updates != 0) {
191                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193         }
194         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195         if (ctdb_db) {
196                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197                                   "commit active. Forcing recovery.\n"));
198                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199                 ctdb_db->transaction_active = false;
200         }
201
202         return 0;
203 }
204
205
206 /*
207   this is called when the ctdb daemon received a ctdb request message
208   from a local client over the unix domain socket
209  */
210 static void daemon_request_message_from_client(struct ctdb_client *client, 
211                                                struct ctdb_req_message *c)
212 {
213         TDB_DATA data;
214         int res;
215
216         /* maybe the message is for another client on this node */
217         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
219                 return;
220         }
221
222         /* its for a remote node */
223         data.dptr = &c->data[0];
224         data.dsize = c->datalen;
225         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
226                                        c->srvid, data);
227         if (res != 0) {
228                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
229                          c->hdr.destnode));
230         }
231 }
232
233
234 struct daemon_call_state {
235         struct ctdb_client *client;
236         uint32_t reqid;
237         struct ctdb_call *call;
238         struct timeval start_time;
239 };
240
241 /* 
242    complete a call from a client 
243 */
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 {
246         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
247                                                            struct daemon_call_state);
248         struct ctdb_reply_call *r;
249         int res;
250         uint32_t length;
251         struct ctdb_client *client = dstate->client;
252         struct ctdb_db_context *ctdb_db = state->ctdb_db;
253
254         talloc_steal(client, dstate);
255         talloc_steal(dstate, dstate->call);
256
257         res = ctdb_daemon_call_recv(state, dstate->call);
258         if (res != 0) {
259                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260                 if (client->ctdb->statistics.pending_calls > 0) {
261                         client->ctdb->statistics.pending_calls--;
262                 }
263                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
264                 return;
265         }
266
267         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
269                                length, struct ctdb_reply_call);
270         if (r == NULL) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272                 if (client->ctdb->statistics.pending_calls > 0) {
273                         client->ctdb->statistics.pending_calls--;
274                 }
275                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
276                 return;
277         }
278         r->hdr.reqid        = dstate->reqid;
279         r->datalen          = dstate->call->reply_data.dsize;
280         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
281
282         res = daemon_queue_send(client, &r->hdr);
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
287         talloc_free(dstate);
288         if (client->ctdb->statistics.pending_calls > 0) {
289                 client->ctdb->statistics.pending_calls--;
290         }
291 }
292
293 struct ctdb_daemon_packet_wrap {
294         struct ctdb_context *ctdb;
295         uint32_t client_id;
296 };
297
298 /*
299   a wrapper to catch disconnected clients
300  */
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
302 {
303         struct ctdb_client *client;
304         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
305                                                             struct ctdb_daemon_packet_wrap);
306         if (w == NULL) {
307                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
308                 return;
309         }
310
311         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312         if (client == NULL) {
313                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
314                          w->client_id));
315                 talloc_free(w);
316                 return;
317         }
318         talloc_free(w);
319
320         /* process it */
321         daemon_incoming_packet(client, hdr);    
322 }
323
324
325 /*
326   this is called when the ctdb daemon received a ctdb request call
327   from a local client over the unix domain socket
328  */
329 static void daemon_request_call_from_client(struct ctdb_client *client, 
330                                             struct ctdb_req_call *c)
331 {
332         struct ctdb_call_state *state;
333         struct ctdb_db_context *ctdb_db;
334         struct daemon_call_state *dstate;
335         struct ctdb_call *call;
336         struct ctdb_ltdb_header header;
337         TDB_DATA key, data;
338         int ret;
339         struct ctdb_context *ctdb = client->ctdb;
340         struct ctdb_daemon_packet_wrap *w;
341
342         ctdb->statistics.total_calls++;
343         if (client->ctdb->statistics.pending_calls > 0) {
344                 ctdb->statistics.pending_calls++;
345         }
346
347         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
348         if (!ctdb_db) {
349                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
350                           c->db_id));
351                 if (client->ctdb->statistics.pending_calls > 0) {
352                         ctdb->statistics.pending_calls--;
353                 }
354                 return;
355         }
356
357         key.dptr = c->data;
358         key.dsize = c->keylen;
359
360         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
361         CTDB_NO_MEMORY_VOID(ctdb, w);   
362
363         w->ctdb = ctdb;
364         w->client_id = client->client_id;
365
366         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
367                                            (struct ctdb_req_header *)c, &data,
368                                            daemon_incoming_packet_wrap, w, True);
369         if (ret == -2) {
370                 /* will retry later */
371                 if (client->ctdb->statistics.pending_calls > 0) {
372                         ctdb->statistics.pending_calls--;
373                 }
374                 return;
375         }
376
377         talloc_free(w);
378
379         if (ret != 0) {
380                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
381                 if (client->ctdb->statistics.pending_calls > 0) {
382                         ctdb->statistics.pending_calls--;
383                 }
384                 return;
385         }
386
387         dstate = talloc(client, struct daemon_call_state);
388         if (dstate == NULL) {
389                 ctdb_ltdb_unlock(ctdb_db, key);
390                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
391                 if (client->ctdb->statistics.pending_calls > 0) {
392                         ctdb->statistics.pending_calls--;
393                 }
394                 return;
395         }
396         dstate->start_time = timeval_current();
397         dstate->client = client;
398         dstate->reqid  = c->hdr.reqid;
399         talloc_steal(dstate, data.dptr);
400
401         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
402         if (call == NULL) {
403                 ctdb_ltdb_unlock(ctdb_db, key);
404                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
405                 if (client->ctdb->statistics.pending_calls > 0) {
406                         ctdb->statistics.pending_calls--;
407                 }
408                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
409                 return;
410         }
411
412         call->call_id = c->callid;
413         call->key = key;
414         call->call_data.dptr = c->data + c->keylen;
415         call->call_data.dsize = c->calldatalen;
416         call->flags = c->flags;
417
418         if (header.dmaster == ctdb->pnn) {
419                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
420         } else {
421                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
422         }
423
424         ctdb_ltdb_unlock(ctdb_db, key);
425
426         if (state == NULL) {
427                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
428                 if (client->ctdb->statistics.pending_calls > 0) {
429                         ctdb->statistics.pending_calls--;
430                 }
431                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
432                 return;
433         }
434         talloc_steal(state, dstate);
435         talloc_steal(client, state);
436
437         state->async.fn = daemon_call_from_client_callback;
438         state->async.private_data = dstate;
439 }
440
441
442 static void daemon_request_control_from_client(struct ctdb_client *client, 
443                                                struct ctdb_req_control *c);
444
445 /* data contains a packet from the client */
446 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
447 {
448         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
449         TALLOC_CTX *tmp_ctx;
450         struct ctdb_context *ctdb = client->ctdb;
451
452         /* place the packet as a child of a tmp_ctx. We then use
453            talloc_free() below to free it. If any of the calls want
454            to keep it, then they will steal it somewhere else, and the
455            talloc_free() will be a no-op */
456         tmp_ctx = talloc_new(client);
457         talloc_steal(tmp_ctx, hdr);
458
459         if (hdr->ctdb_magic != CTDB_MAGIC) {
460                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
461                 goto done;
462         }
463
464         if (hdr->ctdb_version != CTDB_VERSION) {
465                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
466                 goto done;
467         }
468
469         switch (hdr->operation) {
470         case CTDB_REQ_CALL:
471                 ctdb->statistics.client.req_call++;
472                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
473                 break;
474
475         case CTDB_REQ_MESSAGE:
476                 ctdb->statistics.client.req_message++;
477                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
478                 break;
479
480         case CTDB_REQ_CONTROL:
481                 ctdb->statistics.client.req_control++;
482                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
483                 break;
484
485         default:
486                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
487                          hdr->operation));
488         }
489
490 done:
491         talloc_free(tmp_ctx);
492 }
493
494 /*
495   called when the daemon gets a incoming packet
496  */
497 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
498 {
499         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
500         struct ctdb_req_header *hdr;
501
502         if (cnt == 0) {
503                 talloc_free(client);
504                 return;
505         }
506
507         client->ctdb->statistics.client_packets_recv++;
508
509         if (cnt < sizeof(*hdr)) {
510                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
511                                (unsigned)cnt);
512                 return;
513         }
514         hdr = (struct ctdb_req_header *)data;
515         if (cnt != hdr->length) {
516                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
517                                (unsigned)hdr->length, (unsigned)cnt);
518                 return;
519         }
520
521         if (hdr->ctdb_magic != CTDB_MAGIC) {
522                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
523                 return;
524         }
525
526         if (hdr->ctdb_version != CTDB_VERSION) {
527                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
528                 return;
529         }
530
531         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
532                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
533                  hdr->srcnode, hdr->destnode));
534
535         /* it is the responsibility of the incoming packet function to free 'data' */
536         daemon_incoming_packet(client, hdr);
537 }
538
539
540 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
541 {
542         if (client_pid->ctdb->client_pids != NULL) {
543                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
544         }
545
546         return 0;
547 }
548
549
550 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
551                          uint16_t flags, void *private_data)
552 {
553         struct sockaddr_un addr;
554         socklen_t len;
555         int fd;
556         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
557         struct ctdb_client *client;
558         struct ctdb_client_pid_list *client_pid;
559 #ifdef _AIX
560         struct peercred_struct cr;
561         socklen_t crl = sizeof(struct peercred_struct);
562 #else
563         struct ucred cr;
564         socklen_t crl = sizeof(struct ucred);
565 #endif
566
567         memset(&addr, 0, sizeof(addr));
568         len = sizeof(addr);
569         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
570         if (fd == -1) {
571                 return;
572         }
573
574         set_nonblocking(fd);
575         set_close_on_exec(fd);
576
577         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
578
579         client = talloc_zero(ctdb, struct ctdb_client);
580 #ifdef _AIX
581         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
582 #else
583         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
584 #endif
585                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
586         }
587
588         client->ctdb = ctdb;
589         client->fd = fd;
590         client->client_id = ctdb_reqid_new(ctdb, client);
591         client->pid = cr.pid;
592
593         client_pid = talloc(client, struct ctdb_client_pid_list);
594         if (client_pid == NULL) {
595                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
596                 close(fd);
597                 talloc_free(client);
598                 return;
599         }               
600         client_pid->ctdb   = ctdb;
601         client_pid->pid    = cr.pid;
602         client_pid->client = client;
603
604         DLIST_ADD(ctdb->client_pids, client_pid);
605
606         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
607                                          ctdb_daemon_read_cb, client);
608
609         talloc_set_destructor(client, ctdb_client_destructor);
610         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
611         ctdb->statistics.num_clients++;
612 }
613
614
615
616 /*
617   create a unix domain socket and bind it
618   return a file descriptor open on the socket 
619 */
620 static int ux_socket_bind(struct ctdb_context *ctdb)
621 {
622         struct sockaddr_un addr;
623
624         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
625         if (ctdb->daemon.sd == -1) {
626                 return -1;
627         }
628
629         set_close_on_exec(ctdb->daemon.sd);
630         set_nonblocking(ctdb->daemon.sd);
631
632         memset(&addr, 0, sizeof(addr));
633         addr.sun_family = AF_UNIX;
634         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
635
636         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
637                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
638                 goto failed;
639         }       
640
641         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
642             chmod(ctdb->daemon.name, 0700) != 0) {
643                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
644                 goto failed;
645         } 
646
647
648         if (listen(ctdb->daemon.sd, 100) != 0) {
649                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
650                 goto failed;
651         }
652
653         return 0;
654
655 failed:
656         close(ctdb->daemon.sd);
657         ctdb->daemon.sd = -1;
658         return -1;      
659 }
660
661 static void sig_child_handler(struct event_context *ev,
662         struct signal_event *se, int signum, int count,
663         void *dont_care, 
664         void *private_data)
665 {
666 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
667         int status;
668         pid_t pid = -1;
669
670         while (pid != 0) {
671                 pid = waitpid(-1, &status, WNOHANG);
672                 if (pid == -1) {
673                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
674                         return;
675                 }
676                 if (pid > 0) {
677                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
678                 }
679         }
680 }
681
682 /*
683   start the protocol going as a daemon
684 */
685 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
686 {
687         int res, ret = -1;
688         struct fd_event *fde;
689         const char *domain_socket_name;
690         struct signal_event *se;
691
692         /* get rid of any old sockets */
693         unlink(ctdb->daemon.name);
694
695         /* create a unix domain stream socket to listen to */
696         res = ux_socket_bind(ctdb);
697         if (res!=0) {
698                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
699                 exit(10);
700         }
701
702         if (do_fork && fork()) {
703                 return 0;
704         }
705
706         tdb_reopen_all(False);
707
708         if (do_fork) {
709                 setsid();
710                 close(0);
711                 if (open("/dev/null", O_RDONLY) != 0) {
712                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
713                         exit(11);
714                 }
715         }
716         block_signal(SIGPIPE);
717
718         ctdbd_pid = getpid();
719
720
721         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
722
723         ctdb_high_priority(ctdb);
724
725         /* ensure the socket is deleted on exit of the daemon */
726         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
727         if (domain_socket_name == NULL) {
728                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
729                 exit(12);
730         }
731
732         ctdb->ev = event_context_init(NULL);
733
734         ctdb_set_child_logging(ctdb);
735
736         /* force initial recovery for election */
737         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
738
739         if (strcmp(ctdb->transport, "tcp") == 0) {
740                 int ctdb_tcp_init(struct ctdb_context *);
741                 ret = ctdb_tcp_init(ctdb);
742         }
743 #ifdef USE_INFINIBAND
744         if (strcmp(ctdb->transport, "ib") == 0) {
745                 int ctdb_ibw_init(struct ctdb_context *);
746                 ret = ctdb_ibw_init(ctdb);
747         }
748 #endif
749         if (ret != 0) {
750                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
751                 return -1;
752         }
753
754         if (ctdb->methods == NULL) {
755                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
756                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
757         }
758
759         /* initialise the transport  */
760         if (ctdb->methods->initialise(ctdb) != 0) {
761                 ctdb_fatal(ctdb, "transport failed to initialise");
762         }
763
764         /* attach to any existing persistent databases */
765         if (ctdb_attach_persistent(ctdb) != 0) {
766                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
767         }
768
769         /* start frozen, then let the first election sort things out */
770         if (ctdb_blocking_freeze(ctdb)) {
771                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
772         }
773
774         /* now start accepting clients, only can do this once frozen */
775         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
776                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
777                            ctdb_accept_client, ctdb);
778
779         /* tell all other nodes we've just started up */
780         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
781                                  0, CTDB_CONTROL_STARTUP, 0,
782                                  CTDB_CTRL_FLAG_NOREPLY,
783                                  tdb_null, NULL, NULL);
784
785         /* release any IPs we hold from previous runs of the daemon */
786         ctdb_release_all_ips(ctdb);
787
788         /* start the transport going */
789         ctdb_start_transport(ctdb);
790
791         /* set up a handler to pick up sigchld */
792         se = event_add_signal(ctdb->ev, ctdb,
793                                      SIGCHLD, 0,
794                                      sig_child_handler,
795                                      ctdb);
796         if (se == NULL) {
797                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
798                 exit(1);
799         }
800
801         if (use_syslog) {
802                 if (start_syslog_daemon(ctdb)) {
803                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
804                         exit(10);
805                 }
806         }
807
808         ctdb_lockdown_memory(ctdb);
809           
810         /* go into a wait loop to allow other nodes to complete */
811         event_loop_wait(ctdb->ev);
812
813         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
814         exit(1);
815 }
816
817 /*
818   allocate a packet for use in daemon<->daemon communication
819  */
820 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
821                                                  TALLOC_CTX *mem_ctx, 
822                                                  enum ctdb_operation operation, 
823                                                  size_t length, size_t slength,
824                                                  const char *type)
825 {
826         int size;
827         struct ctdb_req_header *hdr;
828
829         length = MAX(length, slength);
830         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
831
832         if (ctdb->methods == NULL) {
833                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
834                          operation, (unsigned)length));
835                 return NULL;
836         }
837
838         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
839         if (hdr == NULL) {
840                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
841                          operation, (unsigned)length));
842                 return NULL;
843         }
844         talloc_set_name_const(hdr, type);
845         memset(hdr, 0, slength);
846         hdr->length       = length;
847         hdr->operation    = operation;
848         hdr->ctdb_magic   = CTDB_MAGIC;
849         hdr->ctdb_version = CTDB_VERSION;
850         hdr->generation   = ctdb->vnn_map->generation;
851         hdr->srcnode      = ctdb->pnn;
852
853         return hdr;     
854 }
855
856 struct daemon_control_state {
857         struct daemon_control_state *next, *prev;
858         struct ctdb_client *client;
859         struct ctdb_req_control *c;
860         uint32_t reqid;
861         struct ctdb_node *node;
862 };
863
864 /*
865   callback when a control reply comes in
866  */
867 static void daemon_control_callback(struct ctdb_context *ctdb,
868                                     int32_t status, TDB_DATA data, 
869                                     const char *errormsg,
870                                     void *private_data)
871 {
872         struct daemon_control_state *state = talloc_get_type(private_data, 
873                                                              struct daemon_control_state);
874         struct ctdb_client *client = state->client;
875         struct ctdb_reply_control *r;
876         size_t len;
877
878         /* construct a message to send to the client containing the data */
879         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
880         if (errormsg) {
881                 len += strlen(errormsg);
882         }
883         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
884                                struct ctdb_reply_control);
885         CTDB_NO_MEMORY_VOID(ctdb, r);
886
887         r->hdr.reqid     = state->reqid;
888         r->status        = status;
889         r->datalen       = data.dsize;
890         r->errorlen = 0;
891         memcpy(&r->data[0], data.dptr, data.dsize);
892         if (errormsg) {
893                 r->errorlen = strlen(errormsg);
894                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
895         }
896
897         daemon_queue_send(client, &r->hdr);
898
899         talloc_free(state);
900 }
901
902 /*
903   fail all pending controls to a disconnected node
904  */
905 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
906 {
907         struct daemon_control_state *state;
908         while ((state = node->pending_controls)) {
909                 DLIST_REMOVE(node->pending_controls, state);
910                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
911                                         "node is disconnected", state);
912         }
913 }
914
915 /*
916   destroy a daemon_control_state
917  */
918 static int daemon_control_destructor(struct daemon_control_state *state)
919 {
920         if (state->node) {
921                 DLIST_REMOVE(state->node->pending_controls, state);
922         }
923         return 0;
924 }
925
926 /*
927   this is called when the ctdb daemon received a ctdb request control
928   from a local client over the unix domain socket
929  */
930 static void daemon_request_control_from_client(struct ctdb_client *client, 
931                                                struct ctdb_req_control *c)
932 {
933         TDB_DATA data;
934         int res;
935         struct daemon_control_state *state;
936         TALLOC_CTX *tmp_ctx = talloc_new(client);
937
938         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
939                 c->hdr.destnode = client->ctdb->pnn;
940         }
941
942         state = talloc(client, struct daemon_control_state);
943         CTDB_NO_MEMORY_VOID(client->ctdb, state);
944
945         state->client = client;
946         state->c = talloc_steal(state, c);
947         state->reqid = c->hdr.reqid;
948         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
949                 state->node = client->ctdb->nodes[c->hdr.destnode];
950                 DLIST_ADD(state->node->pending_controls, state);
951         } else {
952                 state->node = NULL;
953         }
954
955         talloc_set_destructor(state, daemon_control_destructor);
956
957         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
958                 talloc_steal(tmp_ctx, state);
959         }
960         
961         data.dptr = &c->data[0];
962         data.dsize = c->datalen;
963         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
964                                        c->srvid, c->opcode, client->client_id,
965                                        c->flags,
966                                        data, daemon_control_callback,
967                                        state);
968         if (res != 0) {
969                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
970                          c->hdr.destnode));
971         }
972
973         talloc_free(tmp_ctx);
974 }
975
976 /*
977   register a call function
978 */
979 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
980                          ctdb_fn_t fn, int id)
981 {
982         struct ctdb_registered_call *call;
983         struct ctdb_db_context *ctdb_db;
984
985         ctdb_db = find_ctdb_db(ctdb, db_id);
986         if (ctdb_db == NULL) {
987                 return -1;
988         }
989
990         call = talloc(ctdb_db, struct ctdb_registered_call);
991         call->fn = fn;
992         call->id = id;
993
994         DLIST_ADD(ctdb_db->calls, call);        
995         return 0;
996 }
997
998
999
1000 /*
1001   this local messaging handler is ugly, but is needed to prevent
1002   recursion in ctdb_send_message() when the destination node is the
1003   same as the source node
1004  */
1005 struct ctdb_local_message {
1006         struct ctdb_context *ctdb;
1007         uint64_t srvid;
1008         TDB_DATA data;
1009 };
1010
1011 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1012                                        struct timeval t, void *private_data)
1013 {
1014         struct ctdb_local_message *m = talloc_get_type(private_data, 
1015                                                        struct ctdb_local_message);
1016         int res;
1017
1018         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1019         if (res != 0) {
1020                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1021                           (unsigned long long)m->srvid));
1022         }
1023         talloc_free(m);
1024 }
1025
1026 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1027 {
1028         struct ctdb_local_message *m;
1029         m = talloc(ctdb, struct ctdb_local_message);
1030         CTDB_NO_MEMORY(ctdb, m);
1031
1032         m->ctdb = ctdb;
1033         m->srvid = srvid;
1034         m->data  = data;
1035         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1036         if (m->data.dptr == NULL) {
1037                 talloc_free(m);
1038                 return -1;
1039         }
1040
1041         /* this needs to be done as an event to prevent recursion */
1042         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1043         return 0;
1044 }
1045
1046 /*
1047   send a ctdb message
1048 */
1049 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1050                              uint64_t srvid, TDB_DATA data)
1051 {
1052         struct ctdb_req_message *r;
1053         int len;
1054
1055         if (ctdb->methods == NULL) {
1056                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1057                 return -1;
1058         }
1059
1060         /* see if this is a message to ourselves */
1061         if (pnn == ctdb->pnn) {
1062                 return ctdb_local_message(ctdb, srvid, data);
1063         }
1064
1065         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1066         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1067                                     struct ctdb_req_message);
1068         CTDB_NO_MEMORY(ctdb, r);
1069
1070         r->hdr.destnode  = pnn;
1071         r->srvid         = srvid;
1072         r->datalen       = data.dsize;
1073         memcpy(&r->data[0], data.dptr, data.dsize);
1074
1075         ctdb_queue_packet(ctdb, &r->hdr);
1076
1077         talloc_free(r);
1078         return 0;
1079 }
1080
1081
1082
1083 struct ctdb_client_notify_list {
1084         struct ctdb_client_notify_list *next, *prev;
1085         struct ctdb_context *ctdb;
1086         uint64_t srvid;
1087         TDB_DATA data;
1088 };
1089
1090
1091 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1092 {
1093         int ret;
1094
1095         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1096
1097         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1098         if (ret != 0) {
1099                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1100         }
1101
1102         return 0;
1103 }
1104
1105 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1106 {
1107         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1108         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1109         struct ctdb_client_notify_list *nl;
1110
1111         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1112
1113         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1114                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1115                 return -1;
1116         }
1117
1118         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1119                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1120                 return -1;
1121         }
1122
1123
1124         if (client == NULL) {
1125                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1126                 return -1;
1127         }
1128
1129         for(nl=client->notify; nl; nl=nl->next) {
1130                 if (nl->srvid == notify->srvid) {
1131                         break;
1132                 }
1133         }
1134         if (nl != NULL) {
1135                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1136                 return -1;
1137         }
1138
1139         nl = talloc(client, struct ctdb_client_notify_list);
1140         CTDB_NO_MEMORY(ctdb, nl);
1141         nl->ctdb       = ctdb;
1142         nl->srvid      = notify->srvid;
1143         nl->data.dsize = notify->len;
1144         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1145         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1146         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1147         
1148         DLIST_ADD(client->notify, nl);
1149         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1150
1151         return 0;
1152 }
1153
1154 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1155 {
1156         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1157         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1158         struct ctdb_client_notify_list *nl;
1159
1160         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1161
1162         if (client == NULL) {
1163                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1164                 return -1;
1165         }
1166
1167         for(nl=client->notify; nl; nl=nl->next) {
1168                 if (nl->srvid == notify->srvid) {
1169                         break;
1170                 }
1171         }
1172         if (nl == NULL) {
1173                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1174                 return -1;
1175         }
1176
1177         DLIST_REMOVE(client->notify, nl);
1178         talloc_set_destructor(nl, NULL);
1179         talloc_free(nl);
1180
1181         return 0;
1182 }
1183
1184 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1185 {
1186         struct ctdb_client_pid_list *client_pid;
1187
1188         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1189                 if (client_pid->pid == pid) {
1190                         return client_pid->client;
1191                 }
1192         }
1193         return NULL;
1194 }
1195
1196
1197 /* This control is used by samba when probing if a process (of a samba daemon)
1198    exists on the node.
1199    Samba does this when it needs/wants to check if a subrecord in one of the
1200    databases is still valied, or if it is stale and can be removed.
1201    If the node is in unhealthy or stopped state we just kill of the samba
1202    process holding htis sub-record and return to the calling samba that
1203    the process does not exist.
1204    This allows us to forcefully recall subrecords registered by samba processes
1205    on banned and stopped nodes.
1206 */
1207 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1208 {
1209         struct ctdb_client *client;
1210
1211         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1212                 client = ctdb_find_client_by_pid(ctdb, pid);
1213                 if (client != NULL) {
1214                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1215                         talloc_free(client);
1216                 }
1217                 return -1;
1218         }
1219
1220         return kill(pid, 0);
1221 }