server: add "init" event
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
104                         return 0;
105                 }
106         }
107         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
108 }
109
110 /*
111   message handler for when we are in daemon mode. This redirects the message
112   to the right client
113  */
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
115                                     TDB_DATA data, void *private_data)
116 {
117         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118         struct ctdb_req_message *r;
119         int len;
120
121         /* construct a message to send to the client containing the data */
122         len = offsetof(struct ctdb_req_message, data) + data.dsize;
123         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
124                                len, struct ctdb_req_message);
125         CTDB_NO_MEMORY_VOID(ctdb, r);
126
127         talloc_set_name_const(r, "req_message packet");
128
129         r->srvid         = srvid;
130         r->datalen       = data.dsize;
131         memcpy(&r->data[0], data.dptr, data.dsize);
132
133         daemon_queue_send(client, &r->hdr);
134
135         talloc_free(r);
136 }
137
138 /*
139   this is called when the ctdb daemon received a ctdb request to 
140   set the srvid from the client
141  */
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
143 {
144         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
145         int res;
146         if (client == NULL) {
147                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
148                 return -1;
149         }
150         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
151         if (res != 0) {
152                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
153                          (unsigned long long)srvid));
154         } else {
155                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
156                          (unsigned long long)srvid));
157         }
158
159         return res;
160 }
161
162 /*
163   this is called when the ctdb daemon received a ctdb request to 
164   remove a srvid from the client
165  */
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
167 {
168         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169         if (client == NULL) {
170                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
171                 return -1;
172         }
173         return ctdb_deregister_message_handler(ctdb, srvid, client);
174 }
175
176
177 /*
178   destroy a ctdb_client
179 */
180 static int ctdb_client_destructor(struct ctdb_client *client)
181 {
182         struct ctdb_db_context *ctdb_db;
183
184         ctdb_takeover_client_destructor_hook(client);
185         ctdb_reqid_remove(client->ctdb, client->client_id);
186         if (client->ctdb->statistics.num_clients) {
187                 client->ctdb->statistics.num_clients--;
188         }
189
190         if (client->num_persistent_updates != 0) {
191                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193         }
194         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195         if (ctdb_db) {
196                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197                                   "commit active. Forcing recovery.\n"));
198                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199                 ctdb_db->transaction_active = false;
200         }
201
202         return 0;
203 }
204
205
206 /*
207   this is called when the ctdb daemon received a ctdb request message
208   from a local client over the unix domain socket
209  */
210 static void daemon_request_message_from_client(struct ctdb_client *client, 
211                                                struct ctdb_req_message *c)
212 {
213         TDB_DATA data;
214         int res;
215
216         /* maybe the message is for another client on this node */
217         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
219                 return;
220         }
221
222         /* its for a remote node */
223         data.dptr = &c->data[0];
224         data.dsize = c->datalen;
225         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
226                                        c->srvid, data);
227         if (res != 0) {
228                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
229                          c->hdr.destnode));
230         }
231 }
232
233
234 struct daemon_call_state {
235         struct ctdb_client *client;
236         uint32_t reqid;
237         struct ctdb_call *call;
238         struct timeval start_time;
239 };
240
241 /* 
242    complete a call from a client 
243 */
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 {
246         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
247                                                            struct daemon_call_state);
248         struct ctdb_reply_call *r;
249         int res;
250         uint32_t length;
251         struct ctdb_client *client = dstate->client;
252         struct ctdb_db_context *ctdb_db = state->ctdb_db;
253
254         talloc_steal(client, dstate);
255         talloc_steal(dstate, dstate->call);
256
257         res = ctdb_daemon_call_recv(state, dstate->call);
258         if (res != 0) {
259                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260                 if (client->ctdb->statistics.pending_calls > 0) {
261                         client->ctdb->statistics.pending_calls--;
262                 }
263                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
264                 return;
265         }
266
267         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
269                                length, struct ctdb_reply_call);
270         if (r == NULL) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272                 if (client->ctdb->statistics.pending_calls > 0) {
273                         client->ctdb->statistics.pending_calls--;
274                 }
275                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
276                 return;
277         }
278         r->hdr.reqid        = dstate->reqid;
279         r->datalen          = dstate->call->reply_data.dsize;
280         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
281
282         res = daemon_queue_send(client, &r->hdr);
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
287         talloc_free(dstate);
288         if (client->ctdb->statistics.pending_calls > 0) {
289                 client->ctdb->statistics.pending_calls--;
290         }
291 }
292
293 struct ctdb_daemon_packet_wrap {
294         struct ctdb_context *ctdb;
295         uint32_t client_id;
296 };
297
298 /*
299   a wrapper to catch disconnected clients
300  */
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
302 {
303         struct ctdb_client *client;
304         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
305                                                             struct ctdb_daemon_packet_wrap);
306         if (w == NULL) {
307                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
308                 return;
309         }
310
311         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312         if (client == NULL) {
313                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
314                          w->client_id));
315                 talloc_free(w);
316                 return;
317         }
318         talloc_free(w);
319
320         /* process it */
321         daemon_incoming_packet(client, hdr);    
322 }
323
324
325 /*
326   this is called when the ctdb daemon received a ctdb request call
327   from a local client over the unix domain socket
328  */
329 static void daemon_request_call_from_client(struct ctdb_client *client, 
330                                             struct ctdb_req_call *c)
331 {
332         struct ctdb_call_state *state;
333         struct ctdb_db_context *ctdb_db;
334         struct daemon_call_state *dstate;
335         struct ctdb_call *call;
336         struct ctdb_ltdb_header header;
337         TDB_DATA key, data;
338         int ret;
339         struct ctdb_context *ctdb = client->ctdb;
340         struct ctdb_daemon_packet_wrap *w;
341
342         ctdb->statistics.total_calls++;
343         if (client->ctdb->statistics.pending_calls > 0) {
344                 ctdb->statistics.pending_calls++;
345         }
346
347         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
348         if (!ctdb_db) {
349                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
350                           c->db_id));
351                 if (client->ctdb->statistics.pending_calls > 0) {
352                         ctdb->statistics.pending_calls--;
353                 }
354                 return;
355         }
356
357         if (ctdb_db->unhealthy_reason) {
358                 /*
359                  * this is just a warning, as the tdb should be empty anyway,
360                  * and only persistent databases can be unhealthy, which doesn't
361                  * use this code patch
362                  */
363                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
364                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
365         }
366
367         key.dptr = c->data;
368         key.dsize = c->keylen;
369
370         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
371         CTDB_NO_MEMORY_VOID(ctdb, w);   
372
373         w->ctdb = ctdb;
374         w->client_id = client->client_id;
375
376         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
377                                            (struct ctdb_req_header *)c, &data,
378                                            daemon_incoming_packet_wrap, w, True);
379         if (ret == -2) {
380                 /* will retry later */
381                 if (client->ctdb->statistics.pending_calls > 0) {
382                         ctdb->statistics.pending_calls--;
383                 }
384                 return;
385         }
386
387         talloc_free(w);
388
389         if (ret != 0) {
390                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
391                 if (client->ctdb->statistics.pending_calls > 0) {
392                         ctdb->statistics.pending_calls--;
393                 }
394                 return;
395         }
396
397         dstate = talloc(client, struct daemon_call_state);
398         if (dstate == NULL) {
399                 ctdb_ltdb_unlock(ctdb_db, key);
400                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
401                 if (client->ctdb->statistics.pending_calls > 0) {
402                         ctdb->statistics.pending_calls--;
403                 }
404                 return;
405         }
406         dstate->start_time = timeval_current();
407         dstate->client = client;
408         dstate->reqid  = c->hdr.reqid;
409         talloc_steal(dstate, data.dptr);
410
411         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
412         if (call == NULL) {
413                 ctdb_ltdb_unlock(ctdb_db, key);
414                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
415                 if (client->ctdb->statistics.pending_calls > 0) {
416                         ctdb->statistics.pending_calls--;
417                 }
418                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
419                 return;
420         }
421
422         call->call_id = c->callid;
423         call->key = key;
424         call->call_data.dptr = c->data + c->keylen;
425         call->call_data.dsize = c->calldatalen;
426         call->flags = c->flags;
427
428         if (header.dmaster == ctdb->pnn) {
429                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
430         } else {
431                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
432         }
433
434         ctdb_ltdb_unlock(ctdb_db, key);
435
436         if (state == NULL) {
437                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
438                 if (client->ctdb->statistics.pending_calls > 0) {
439                         ctdb->statistics.pending_calls--;
440                 }
441                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
442                 return;
443         }
444         talloc_steal(state, dstate);
445         talloc_steal(client, state);
446
447         state->async.fn = daemon_call_from_client_callback;
448         state->async.private_data = dstate;
449 }
450
451
452 static void daemon_request_control_from_client(struct ctdb_client *client, 
453                                                struct ctdb_req_control *c);
454
455 /* data contains a packet from the client */
456 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
457 {
458         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
459         TALLOC_CTX *tmp_ctx;
460         struct ctdb_context *ctdb = client->ctdb;
461
462         /* place the packet as a child of a tmp_ctx. We then use
463            talloc_free() below to free it. If any of the calls want
464            to keep it, then they will steal it somewhere else, and the
465            talloc_free() will be a no-op */
466         tmp_ctx = talloc_new(client);
467         talloc_steal(tmp_ctx, hdr);
468
469         if (hdr->ctdb_magic != CTDB_MAGIC) {
470                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
471                 goto done;
472         }
473
474         if (hdr->ctdb_version != CTDB_VERSION) {
475                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
476                 goto done;
477         }
478
479         switch (hdr->operation) {
480         case CTDB_REQ_CALL:
481                 ctdb->statistics.client.req_call++;
482                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
483                 break;
484
485         case CTDB_REQ_MESSAGE:
486                 ctdb->statistics.client.req_message++;
487                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
488                 break;
489
490         case CTDB_REQ_CONTROL:
491                 ctdb->statistics.client.req_control++;
492                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
493                 break;
494
495         default:
496                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
497                          hdr->operation));
498         }
499
500 done:
501         talloc_free(tmp_ctx);
502 }
503
504 /*
505   called when the daemon gets a incoming packet
506  */
507 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
508 {
509         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
510         struct ctdb_req_header *hdr;
511
512         if (cnt == 0) {
513                 talloc_free(client);
514                 return;
515         }
516
517         client->ctdb->statistics.client_packets_recv++;
518
519         if (cnt < sizeof(*hdr)) {
520                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
521                                (unsigned)cnt);
522                 return;
523         }
524         hdr = (struct ctdb_req_header *)data;
525         if (cnt != hdr->length) {
526                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
527                                (unsigned)hdr->length, (unsigned)cnt);
528                 return;
529         }
530
531         if (hdr->ctdb_magic != CTDB_MAGIC) {
532                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
533                 return;
534         }
535
536         if (hdr->ctdb_version != CTDB_VERSION) {
537                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
538                 return;
539         }
540
541         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
542                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
543                  hdr->srcnode, hdr->destnode));
544
545         /* it is the responsibility of the incoming packet function to free 'data' */
546         daemon_incoming_packet(client, hdr);
547 }
548
549
550 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
551 {
552         if (client_pid->ctdb->client_pids != NULL) {
553                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
554         }
555
556         return 0;
557 }
558
559
560 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
561                          uint16_t flags, void *private_data)
562 {
563         struct sockaddr_un addr;
564         socklen_t len;
565         int fd;
566         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
567         struct ctdb_client *client;
568         struct ctdb_client_pid_list *client_pid;
569 #ifdef _AIX
570         struct peercred_struct cr;
571         socklen_t crl = sizeof(struct peercred_struct);
572 #else
573         struct ucred cr;
574         socklen_t crl = sizeof(struct ucred);
575 #endif
576
577         memset(&addr, 0, sizeof(addr));
578         len = sizeof(addr);
579         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
580         if (fd == -1) {
581                 return;
582         }
583
584         set_nonblocking(fd);
585         set_close_on_exec(fd);
586
587         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
588
589         client = talloc_zero(ctdb, struct ctdb_client);
590 #ifdef _AIX
591         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
592 #else
593         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
594 #endif
595                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
596         }
597
598         client->ctdb = ctdb;
599         client->fd = fd;
600         client->client_id = ctdb_reqid_new(ctdb, client);
601         client->pid = cr.pid;
602
603         client_pid = talloc(client, struct ctdb_client_pid_list);
604         if (client_pid == NULL) {
605                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
606                 close(fd);
607                 talloc_free(client);
608                 return;
609         }               
610         client_pid->ctdb   = ctdb;
611         client_pid->pid    = cr.pid;
612         client_pid->client = client;
613
614         DLIST_ADD(ctdb->client_pids, client_pid);
615
616         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
617                                          ctdb_daemon_read_cb, client);
618
619         talloc_set_destructor(client, ctdb_client_destructor);
620         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
621         ctdb->statistics.num_clients++;
622 }
623
624
625
626 /*
627   create a unix domain socket and bind it
628   return a file descriptor open on the socket 
629 */
630 static int ux_socket_bind(struct ctdb_context *ctdb)
631 {
632         struct sockaddr_un addr;
633
634         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
635         if (ctdb->daemon.sd == -1) {
636                 return -1;
637         }
638
639         set_close_on_exec(ctdb->daemon.sd);
640         set_nonblocking(ctdb->daemon.sd);
641
642         memset(&addr, 0, sizeof(addr));
643         addr.sun_family = AF_UNIX;
644         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
645
646         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
647                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
648                 goto failed;
649         }       
650
651         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
652             chmod(ctdb->daemon.name, 0700) != 0) {
653                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
654                 goto failed;
655         } 
656
657
658         if (listen(ctdb->daemon.sd, 100) != 0) {
659                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
660                 goto failed;
661         }
662
663         return 0;
664
665 failed:
666         close(ctdb->daemon.sd);
667         ctdb->daemon.sd = -1;
668         return -1;      
669 }
670
671 static void sig_child_handler(struct event_context *ev,
672         struct signal_event *se, int signum, int count,
673         void *dont_care, 
674         void *private_data)
675 {
676 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
677         int status;
678         pid_t pid = -1;
679
680         while (pid != 0) {
681                 pid = waitpid(-1, &status, WNOHANG);
682                 if (pid == -1) {
683                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
684                         return;
685                 }
686                 if (pid > 0) {
687                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
688                 }
689         }
690 }
691
692 /*
693   start the protocol going as a daemon
694 */
695 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
696 {
697         int res, ret = -1;
698         struct fd_event *fde;
699         const char *domain_socket_name;
700         struct signal_event *se;
701
702         /* get rid of any old sockets */
703         unlink(ctdb->daemon.name);
704
705         /* create a unix domain stream socket to listen to */
706         res = ux_socket_bind(ctdb);
707         if (res!=0) {
708                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
709                 exit(10);
710         }
711
712         if (do_fork && fork()) {
713                 return 0;
714         }
715
716         tdb_reopen_all(False);
717
718         if (do_fork) {
719                 setsid();
720                 close(0);
721                 if (open("/dev/null", O_RDONLY) != 0) {
722                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
723                         exit(11);
724                 }
725         }
726         block_signal(SIGPIPE);
727
728         ctdbd_pid = getpid();
729
730
731         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
732
733         ctdb_high_priority(ctdb);
734
735         /* ensure the socket is deleted on exit of the daemon */
736         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
737         if (domain_socket_name == NULL) {
738                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
739                 exit(12);
740         }
741
742         ctdb->ev = event_context_init(NULL);
743
744         ctdb_set_child_logging(ctdb);
745
746         /* force initial recovery for election */
747         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
748
749         if (strcmp(ctdb->transport, "tcp") == 0) {
750                 int ctdb_tcp_init(struct ctdb_context *);
751                 ret = ctdb_tcp_init(ctdb);
752         }
753 #ifdef USE_INFINIBAND
754         if (strcmp(ctdb->transport, "ib") == 0) {
755                 int ctdb_ibw_init(struct ctdb_context *);
756                 ret = ctdb_ibw_init(ctdb);
757         }
758 #endif
759         if (ret != 0) {
760                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
761                 return -1;
762         }
763
764         if (ctdb->methods == NULL) {
765                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
766                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
767         }
768
769         /* initialise the transport  */
770         if (ctdb->methods->initialise(ctdb) != 0) {
771                 ctdb_fatal(ctdb, "transport failed to initialise");
772         }
773
774         /* attach to existing databases */
775         if (ctdb_attach_databases(ctdb) != 0) {
776                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
777         }
778
779         /* start frozen, then let the first election sort things out */
780         if (ctdb_blocking_freeze(ctdb)) {
781                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
782         }
783
784         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
785         if (ret != 0) {
786                 ctdb_fatal(ctdb, "Failed to run init event\n");
787         }
788         ctdb_run_notification_script(ctdb, "init");
789
790         /* now start accepting clients, only can do this once frozen */
791         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
792                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
793                            ctdb_accept_client, ctdb);
794
795         /* tell all other nodes we've just started up */
796         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
797                                  0, CTDB_CONTROL_STARTUP, 0,
798                                  CTDB_CTRL_FLAG_NOREPLY,
799                                  tdb_null, NULL, NULL);
800
801         /* release any IPs we hold from previous runs of the daemon */
802         ctdb_release_all_ips(ctdb);
803
804         /* start the transport going */
805         ctdb_start_transport(ctdb);
806
807         /* set up a handler to pick up sigchld */
808         se = event_add_signal(ctdb->ev, ctdb,
809                                      SIGCHLD, 0,
810                                      sig_child_handler,
811                                      ctdb);
812         if (se == NULL) {
813                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
814                 exit(1);
815         }
816
817         if (use_syslog) {
818                 if (start_syslog_daemon(ctdb)) {
819                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
820                         exit(10);
821                 }
822         }
823
824         ctdb_lockdown_memory(ctdb);
825           
826         /* go into a wait loop to allow other nodes to complete */
827         event_loop_wait(ctdb->ev);
828
829         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
830         exit(1);
831 }
832
833 /*
834   allocate a packet for use in daemon<->daemon communication
835  */
836 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
837                                                  TALLOC_CTX *mem_ctx, 
838                                                  enum ctdb_operation operation, 
839                                                  size_t length, size_t slength,
840                                                  const char *type)
841 {
842         int size;
843         struct ctdb_req_header *hdr;
844
845         length = MAX(length, slength);
846         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
847
848         if (ctdb->methods == NULL) {
849                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
850                          operation, (unsigned)length));
851                 return NULL;
852         }
853
854         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
855         if (hdr == NULL) {
856                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
857                          operation, (unsigned)length));
858                 return NULL;
859         }
860         talloc_set_name_const(hdr, type);
861         memset(hdr, 0, slength);
862         hdr->length       = length;
863         hdr->operation    = operation;
864         hdr->ctdb_magic   = CTDB_MAGIC;
865         hdr->ctdb_version = CTDB_VERSION;
866         hdr->generation   = ctdb->vnn_map->generation;
867         hdr->srcnode      = ctdb->pnn;
868
869         return hdr;     
870 }
871
872 struct daemon_control_state {
873         struct daemon_control_state *next, *prev;
874         struct ctdb_client *client;
875         struct ctdb_req_control *c;
876         uint32_t reqid;
877         struct ctdb_node *node;
878 };
879
880 /*
881   callback when a control reply comes in
882  */
883 static void daemon_control_callback(struct ctdb_context *ctdb,
884                                     int32_t status, TDB_DATA data, 
885                                     const char *errormsg,
886                                     void *private_data)
887 {
888         struct daemon_control_state *state = talloc_get_type(private_data, 
889                                                              struct daemon_control_state);
890         struct ctdb_client *client = state->client;
891         struct ctdb_reply_control *r;
892         size_t len;
893
894         /* construct a message to send to the client containing the data */
895         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
896         if (errormsg) {
897                 len += strlen(errormsg);
898         }
899         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
900                                struct ctdb_reply_control);
901         CTDB_NO_MEMORY_VOID(ctdb, r);
902
903         r->hdr.reqid     = state->reqid;
904         r->status        = status;
905         r->datalen       = data.dsize;
906         r->errorlen = 0;
907         memcpy(&r->data[0], data.dptr, data.dsize);
908         if (errormsg) {
909                 r->errorlen = strlen(errormsg);
910                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
911         }
912
913         daemon_queue_send(client, &r->hdr);
914
915         talloc_free(state);
916 }
917
918 /*
919   fail all pending controls to a disconnected node
920  */
921 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
922 {
923         struct daemon_control_state *state;
924         while ((state = node->pending_controls)) {
925                 DLIST_REMOVE(node->pending_controls, state);
926                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
927                                         "node is disconnected", state);
928         }
929 }
930
931 /*
932   destroy a daemon_control_state
933  */
934 static int daemon_control_destructor(struct daemon_control_state *state)
935 {
936         if (state->node) {
937                 DLIST_REMOVE(state->node->pending_controls, state);
938         }
939         return 0;
940 }
941
942 /*
943   this is called when the ctdb daemon received a ctdb request control
944   from a local client over the unix domain socket
945  */
946 static void daemon_request_control_from_client(struct ctdb_client *client, 
947                                                struct ctdb_req_control *c)
948 {
949         TDB_DATA data;
950         int res;
951         struct daemon_control_state *state;
952         TALLOC_CTX *tmp_ctx = talloc_new(client);
953
954         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
955                 c->hdr.destnode = client->ctdb->pnn;
956         }
957
958         state = talloc(client, struct daemon_control_state);
959         CTDB_NO_MEMORY_VOID(client->ctdb, state);
960
961         state->client = client;
962         state->c = talloc_steal(state, c);
963         state->reqid = c->hdr.reqid;
964         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
965                 state->node = client->ctdb->nodes[c->hdr.destnode];
966                 DLIST_ADD(state->node->pending_controls, state);
967         } else {
968                 state->node = NULL;
969         }
970
971         talloc_set_destructor(state, daemon_control_destructor);
972
973         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
974                 talloc_steal(tmp_ctx, state);
975         }
976         
977         data.dptr = &c->data[0];
978         data.dsize = c->datalen;
979         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
980                                        c->srvid, c->opcode, client->client_id,
981                                        c->flags,
982                                        data, daemon_control_callback,
983                                        state);
984         if (res != 0) {
985                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
986                          c->hdr.destnode));
987         }
988
989         talloc_free(tmp_ctx);
990 }
991
992 /*
993   register a call function
994 */
995 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
996                          ctdb_fn_t fn, int id)
997 {
998         struct ctdb_registered_call *call;
999         struct ctdb_db_context *ctdb_db;
1000
1001         ctdb_db = find_ctdb_db(ctdb, db_id);
1002         if (ctdb_db == NULL) {
1003                 return -1;
1004         }
1005
1006         call = talloc(ctdb_db, struct ctdb_registered_call);
1007         call->fn = fn;
1008         call->id = id;
1009
1010         DLIST_ADD(ctdb_db->calls, call);        
1011         return 0;
1012 }
1013
1014
1015
1016 /*
1017   this local messaging handler is ugly, but is needed to prevent
1018   recursion in ctdb_send_message() when the destination node is the
1019   same as the source node
1020  */
1021 struct ctdb_local_message {
1022         struct ctdb_context *ctdb;
1023         uint64_t srvid;
1024         TDB_DATA data;
1025 };
1026
1027 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1028                                        struct timeval t, void *private_data)
1029 {
1030         struct ctdb_local_message *m = talloc_get_type(private_data, 
1031                                                        struct ctdb_local_message);
1032         int res;
1033
1034         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1035         if (res != 0) {
1036                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1037                           (unsigned long long)m->srvid));
1038         }
1039         talloc_free(m);
1040 }
1041
1042 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1043 {
1044         struct ctdb_local_message *m;
1045         m = talloc(ctdb, struct ctdb_local_message);
1046         CTDB_NO_MEMORY(ctdb, m);
1047
1048         m->ctdb = ctdb;
1049         m->srvid = srvid;
1050         m->data  = data;
1051         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1052         if (m->data.dptr == NULL) {
1053                 talloc_free(m);
1054                 return -1;
1055         }
1056
1057         /* this needs to be done as an event to prevent recursion */
1058         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1059         return 0;
1060 }
1061
1062 /*
1063   send a ctdb message
1064 */
1065 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1066                              uint64_t srvid, TDB_DATA data)
1067 {
1068         struct ctdb_req_message *r;
1069         int len;
1070
1071         if (ctdb->methods == NULL) {
1072                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1073                 return -1;
1074         }
1075
1076         /* see if this is a message to ourselves */
1077         if (pnn == ctdb->pnn) {
1078                 return ctdb_local_message(ctdb, srvid, data);
1079         }
1080
1081         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1082         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1083                                     struct ctdb_req_message);
1084         CTDB_NO_MEMORY(ctdb, r);
1085
1086         r->hdr.destnode  = pnn;
1087         r->srvid         = srvid;
1088         r->datalen       = data.dsize;
1089         memcpy(&r->data[0], data.dptr, data.dsize);
1090
1091         ctdb_queue_packet(ctdb, &r->hdr);
1092
1093         talloc_free(r);
1094         return 0;
1095 }
1096
1097
1098
1099 struct ctdb_client_notify_list {
1100         struct ctdb_client_notify_list *next, *prev;
1101         struct ctdb_context *ctdb;
1102         uint64_t srvid;
1103         TDB_DATA data;
1104 };
1105
1106
1107 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1108 {
1109         int ret;
1110
1111         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1112
1113         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1114         if (ret != 0) {
1115                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1116         }
1117
1118         return 0;
1119 }
1120
1121 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1122 {
1123         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1124         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1125         struct ctdb_client_notify_list *nl;
1126
1127         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1128
1129         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1130                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1131                 return -1;
1132         }
1133
1134         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1135                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1136                 return -1;
1137         }
1138
1139
1140         if (client == NULL) {
1141                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1142                 return -1;
1143         }
1144
1145         for(nl=client->notify; nl; nl=nl->next) {
1146                 if (nl->srvid == notify->srvid) {
1147                         break;
1148                 }
1149         }
1150         if (nl != NULL) {
1151                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1152                 return -1;
1153         }
1154
1155         nl = talloc(client, struct ctdb_client_notify_list);
1156         CTDB_NO_MEMORY(ctdb, nl);
1157         nl->ctdb       = ctdb;
1158         nl->srvid      = notify->srvid;
1159         nl->data.dsize = notify->len;
1160         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1161         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1162         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1163         
1164         DLIST_ADD(client->notify, nl);
1165         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1166
1167         return 0;
1168 }
1169
1170 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1171 {
1172         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1173         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1174         struct ctdb_client_notify_list *nl;
1175
1176         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1177
1178         if (client == NULL) {
1179                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1180                 return -1;
1181         }
1182
1183         for(nl=client->notify; nl; nl=nl->next) {
1184                 if (nl->srvid == notify->srvid) {
1185                         break;
1186                 }
1187         }
1188         if (nl == NULL) {
1189                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1190                 return -1;
1191         }
1192
1193         DLIST_REMOVE(client->notify, nl);
1194         talloc_set_destructor(nl, NULL);
1195         talloc_free(nl);
1196
1197         return 0;
1198 }
1199
1200 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1201 {
1202         struct ctdb_client_pid_list *client_pid;
1203
1204         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1205                 if (client_pid->pid == pid) {
1206                         return client_pid->client;
1207                 }
1208         }
1209         return NULL;
1210 }
1211
1212
1213 /* This control is used by samba when probing if a process (of a samba daemon)
1214    exists on the node.
1215    Samba does this when it needs/wants to check if a subrecord in one of the
1216    databases is still valied, or if it is stale and can be removed.
1217    If the node is in unhealthy or stopped state we just kill of the samba
1218    process holding htis sub-record and return to the calling samba that
1219    the process does not exist.
1220    This allows us to forcefully recall subrecords registered by samba processes
1221    on banned and stopped nodes.
1222 */
1223 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1224 {
1225         struct ctdb_client *client;
1226
1227         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1228                 client = ctdb_find_client_by_pid(ctdb, pid);
1229                 if (client != NULL) {
1230                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1231                         talloc_free(client);
1232                 }
1233                 return -1;
1234         }
1235
1236         return kill(pid, 0);
1237 }