Merge commit 'origin/master'
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res == -1) {
285                 /* client is dead - return immediately */
286                 return;
287         }
288         if (res != 0) {
289                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
290         }
291         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
292         talloc_free(dstate);
293         if (client->ctdb->statistics.pending_calls > 0) {
294                 client->ctdb->statistics.pending_calls--;
295         }
296 }
297
298 struct ctdb_daemon_packet_wrap {
299         struct ctdb_context *ctdb;
300         uint32_t client_id;
301 };
302
303 /*
304   a wrapper to catch disconnected clients
305  */
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
307 {
308         struct ctdb_client *client;
309         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
310                                                             struct ctdb_daemon_packet_wrap);
311         if (w == NULL) {
312                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
313                 return;
314         }
315
316         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317         if (client == NULL) {
318                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319                          w->client_id));
320                 talloc_free(w);
321                 return;
322         }
323         talloc_free(w);
324
325         /* process it */
326         daemon_incoming_packet(client, hdr);    
327 }
328
329
330 /*
331   this is called when the ctdb daemon received a ctdb request call
332   from a local client over the unix domain socket
333  */
334 static void daemon_request_call_from_client(struct ctdb_client *client, 
335                                             struct ctdb_req_call *c)
336 {
337         struct ctdb_call_state *state;
338         struct ctdb_db_context *ctdb_db;
339         struct daemon_call_state *dstate;
340         struct ctdb_call *call;
341         struct ctdb_ltdb_header header;
342         TDB_DATA key, data;
343         int ret;
344         struct ctdb_context *ctdb = client->ctdb;
345         struct ctdb_daemon_packet_wrap *w;
346
347         ctdb->statistics.total_calls++;
348         if (client->ctdb->statistics.pending_calls > 0) {
349                 ctdb->statistics.pending_calls++;
350         }
351
352         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
353         if (!ctdb_db) {
354                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
355                           c->db_id));
356                 if (client->ctdb->statistics.pending_calls > 0) {
357                         ctdb->statistics.pending_calls--;
358                 }
359                 return;
360         }
361
362         if (ctdb_db->unhealthy_reason) {
363                 /*
364                  * this is just a warning, as the tdb should be empty anyway,
365                  * and only persistent databases can be unhealthy, which doesn't
366                  * use this code patch
367                  */
368                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
370         }
371
372         key.dptr = c->data;
373         key.dsize = c->keylen;
374
375         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376         CTDB_NO_MEMORY_VOID(ctdb, w);   
377
378         w->ctdb = ctdb;
379         w->client_id = client->client_id;
380
381         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
382                                            (struct ctdb_req_header *)c, &data,
383                                            daemon_incoming_packet_wrap, w, True);
384         if (ret == -2) {
385                 /* will retry later */
386                 if (client->ctdb->statistics.pending_calls > 0) {
387                         ctdb->statistics.pending_calls--;
388                 }
389                 return;
390         }
391
392         talloc_free(w);
393
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396                 if (client->ctdb->statistics.pending_calls > 0) {
397                         ctdb->statistics.pending_calls--;
398                 }
399                 return;
400         }
401
402         dstate = talloc(client, struct daemon_call_state);
403         if (dstate == NULL) {
404                 ctdb_ltdb_unlock(ctdb_db, key);
405                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
406                 if (client->ctdb->statistics.pending_calls > 0) {
407                         ctdb->statistics.pending_calls--;
408                 }
409                 return;
410         }
411         dstate->start_time = timeval_current();
412         dstate->client = client;
413         dstate->reqid  = c->hdr.reqid;
414         talloc_steal(dstate, data.dptr);
415
416         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
417         if (call == NULL) {
418                 ctdb_ltdb_unlock(ctdb_db, key);
419                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
420                 if (client->ctdb->statistics.pending_calls > 0) {
421                         ctdb->statistics.pending_calls--;
422                 }
423                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
424                 return;
425         }
426
427         call->call_id = c->callid;
428         call->key = key;
429         call->call_data.dptr = c->data + c->keylen;
430         call->call_data.dsize = c->calldatalen;
431         call->flags = c->flags;
432
433         if (header.dmaster == ctdb->pnn) {
434                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
435         } else {
436                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
437         }
438
439         ctdb_ltdb_unlock(ctdb_db, key);
440
441         if (state == NULL) {
442                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
443                 if (client->ctdb->statistics.pending_calls > 0) {
444                         ctdb->statistics.pending_calls--;
445                 }
446                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
447                 return;
448         }
449         talloc_steal(state, dstate);
450         talloc_steal(client, state);
451
452         state->async.fn = daemon_call_from_client_callback;
453         state->async.private_data = dstate;
454 }
455
456
457 static void daemon_request_control_from_client(struct ctdb_client *client, 
458                                                struct ctdb_req_control *c);
459
460 /* data contains a packet from the client */
461 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
462 {
463         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
464         TALLOC_CTX *tmp_ctx;
465         struct ctdb_context *ctdb = client->ctdb;
466
467         /* place the packet as a child of a tmp_ctx. We then use
468            talloc_free() below to free it. If any of the calls want
469            to keep it, then they will steal it somewhere else, and the
470            talloc_free() will be a no-op */
471         tmp_ctx = talloc_new(client);
472         talloc_steal(tmp_ctx, hdr);
473
474         if (hdr->ctdb_magic != CTDB_MAGIC) {
475                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
476                 goto done;
477         }
478
479         if (hdr->ctdb_version != CTDB_VERSION) {
480                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
481                 goto done;
482         }
483
484         switch (hdr->operation) {
485         case CTDB_REQ_CALL:
486                 ctdb->statistics.client.req_call++;
487                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
488                 break;
489
490         case CTDB_REQ_MESSAGE:
491                 ctdb->statistics.client.req_message++;
492                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
493                 break;
494
495         case CTDB_REQ_CONTROL:
496                 ctdb->statistics.client.req_control++;
497                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
498                 break;
499
500         default:
501                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
502                          hdr->operation));
503         }
504
505 done:
506         talloc_free(tmp_ctx);
507 }
508
509 /*
510   called when the daemon gets a incoming packet
511  */
512 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
513 {
514         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
515         struct ctdb_req_header *hdr;
516
517         if (cnt == 0) {
518                 talloc_free(client);
519                 return;
520         }
521
522         client->ctdb->statistics.client_packets_recv++;
523
524         if (cnt < sizeof(*hdr)) {
525                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
526                                (unsigned)cnt);
527                 return;
528         }
529         hdr = (struct ctdb_req_header *)data;
530         if (cnt != hdr->length) {
531                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
532                                (unsigned)hdr->length, (unsigned)cnt);
533                 return;
534         }
535
536         if (hdr->ctdb_magic != CTDB_MAGIC) {
537                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
538                 return;
539         }
540
541         if (hdr->ctdb_version != CTDB_VERSION) {
542                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
543                 return;
544         }
545
546         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
547                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
548                  hdr->srcnode, hdr->destnode));
549
550         /* it is the responsibility of the incoming packet function to free 'data' */
551         daemon_incoming_packet(client, hdr);
552 }
553
554
555 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
556 {
557         if (client_pid->ctdb->client_pids != NULL) {
558                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
559         }
560
561         return 0;
562 }
563
564
565 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
566                          uint16_t flags, void *private_data)
567 {
568         struct sockaddr_un addr;
569         socklen_t len;
570         int fd;
571         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572         struct ctdb_client *client;
573         struct ctdb_client_pid_list *client_pid;
574 #ifdef _AIX
575         struct peercred_struct cr;
576         socklen_t crl = sizeof(struct peercred_struct);
577 #else
578         struct ucred cr;
579         socklen_t crl = sizeof(struct ucred);
580 #endif
581
582         memset(&addr, 0, sizeof(addr));
583         len = sizeof(addr);
584         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
585         if (fd == -1) {
586                 return;
587         }
588
589         set_nonblocking(fd);
590         set_close_on_exec(fd);
591
592         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
593
594         client = talloc_zero(ctdb, struct ctdb_client);
595 #ifdef _AIX
596         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
597 #else
598         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
599 #endif
600                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
601         }
602
603         client->ctdb = ctdb;
604         client->fd = fd;
605         client->client_id = ctdb_reqid_new(ctdb, client);
606         client->pid = cr.pid;
607
608         client_pid = talloc(client, struct ctdb_client_pid_list);
609         if (client_pid == NULL) {
610                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
611                 close(fd);
612                 talloc_free(client);
613                 return;
614         }               
615         client_pid->ctdb   = ctdb;
616         client_pid->pid    = cr.pid;
617         client_pid->client = client;
618
619         DLIST_ADD(ctdb->client_pids, client_pid);
620
621         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
622                                          ctdb_daemon_read_cb, client);
623
624         talloc_set_destructor(client, ctdb_client_destructor);
625         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
626         ctdb->statistics.num_clients++;
627 }
628
629
630
631 /*
632   create a unix domain socket and bind it
633   return a file descriptor open on the socket 
634 */
635 static int ux_socket_bind(struct ctdb_context *ctdb)
636 {
637         struct sockaddr_un addr;
638
639         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
640         if (ctdb->daemon.sd == -1) {
641                 return -1;
642         }
643
644         set_close_on_exec(ctdb->daemon.sd);
645         set_nonblocking(ctdb->daemon.sd);
646
647         memset(&addr, 0, sizeof(addr));
648         addr.sun_family = AF_UNIX;
649         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
650
651         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
652                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
653                 goto failed;
654         }       
655
656         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
657             chmod(ctdb->daemon.name, 0700) != 0) {
658                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
659                 goto failed;
660         } 
661
662
663         if (listen(ctdb->daemon.sd, 100) != 0) {
664                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
665                 goto failed;
666         }
667
668         return 0;
669
670 failed:
671         close(ctdb->daemon.sd);
672         ctdb->daemon.sd = -1;
673         return -1;      
674 }
675
676 static void sig_child_handler(struct event_context *ev,
677         struct signal_event *se, int signum, int count,
678         void *dont_care, 
679         void *private_data)
680 {
681 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
682         int status;
683         pid_t pid = -1;
684
685         while (pid != 0) {
686                 pid = waitpid(-1, &status, WNOHANG);
687                 if (pid == -1) {
688                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
689                         return;
690                 }
691                 if (pid > 0) {
692                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
693                 }
694         }
695 }
696
697 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
698                                       void *private_data)
699 {
700         if (status != 0) {
701                 ctdb_fatal(ctdb, "Failed to run setup event\n");
702                 return;
703         }
704         ctdb_run_notification_script(ctdb, "setup");
705
706         /* tell all other nodes we've just started up */
707         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
708                                  0, CTDB_CONTROL_STARTUP, 0,
709                                  CTDB_CTRL_FLAG_NOREPLY,
710                                  tdb_null, NULL, NULL);
711 }
712
713 /*
714   start the protocol going as a daemon
715 */
716 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
717 {
718         int res, ret = -1;
719         struct fd_event *fde;
720         const char *domain_socket_name;
721         struct signal_event *se;
722
723         /* get rid of any old sockets */
724         unlink(ctdb->daemon.name);
725
726         /* create a unix domain stream socket to listen to */
727         res = ux_socket_bind(ctdb);
728         if (res!=0) {
729                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
730                 exit(10);
731         }
732
733         if (do_fork && fork()) {
734                 return 0;
735         }
736
737         tdb_reopen_all(False);
738
739         if (do_fork) {
740                 setsid();
741                 close(0);
742                 if (open("/dev/null", O_RDONLY) != 0) {
743                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
744                         exit(11);
745                 }
746         }
747         block_signal(SIGPIPE);
748
749         ctdbd_pid = getpid();
750
751
752         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
753
754         ctdb_high_priority(ctdb);
755
756         /* ensure the socket is deleted on exit of the daemon */
757         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
758         if (domain_socket_name == NULL) {
759                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
760                 exit(12);
761         }
762
763         ctdb->ev = event_context_init(NULL);
764
765         ctdb_set_child_logging(ctdb);
766
767         /* force initial recovery for election */
768         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
769
770         if (strcmp(ctdb->transport, "tcp") == 0) {
771                 int ctdb_tcp_init(struct ctdb_context *);
772                 ret = ctdb_tcp_init(ctdb);
773         }
774 #ifdef USE_INFINIBAND
775         if (strcmp(ctdb->transport, "ib") == 0) {
776                 int ctdb_ibw_init(struct ctdb_context *);
777                 ret = ctdb_ibw_init(ctdb);
778         }
779 #endif
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
782                 return -1;
783         }
784
785         if (ctdb->methods == NULL) {
786                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
788         }
789
790         /* initialise the transport  */
791         if (ctdb->methods->initialise(ctdb) != 0) {
792                 ctdb_fatal(ctdb, "transport failed to initialise");
793         }
794
795         /* attach to existing databases */
796         if (ctdb_attach_databases(ctdb) != 0) {
797                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
798         }
799
800         /* start frozen, then let the first election sort things out */
801         if (ctdb_blocking_freeze(ctdb)) {
802                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
803         }
804
805         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
806         if (ret != 0) {
807                 ctdb_fatal(ctdb, "Failed to run init event\n");
808         }
809         ctdb_run_notification_script(ctdb, "init");
810
811         /* now start accepting clients, only can do this once frozen */
812         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
813                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
814                            ctdb_accept_client, ctdb);
815
816         /* release any IPs we hold from previous runs of the daemon */
817         ctdb_release_all_ips(ctdb);
818
819         /* start the transport going */
820         ctdb_start_transport(ctdb);
821
822         /* set up a handler to pick up sigchld */
823         se = event_add_signal(ctdb->ev, ctdb,
824                                      SIGCHLD, 0,
825                                      sig_child_handler,
826                                      ctdb);
827         if (se == NULL) {
828                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
829                 exit(1);
830         }
831
832         ret = ctdb_event_script_callback(ctdb,
833                                          ctdb,
834                                          ctdb_setup_event_callback,
835                                          ctdb,
836                                          false,
837                                          CTDB_EVENT_SETUP,
838                                          "");
839         if (ret != 0) {
840                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
841                 exit(1);
842         }
843
844         if (use_syslog) {
845                 if (start_syslog_daemon(ctdb)) {
846                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
847                         exit(10);
848                 }
849         }
850
851         ctdb_lockdown_memory(ctdb);
852           
853         /* go into a wait loop to allow other nodes to complete */
854         event_loop_wait(ctdb->ev);
855
856         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
857         exit(1);
858 }
859
860 /*
861   allocate a packet for use in daemon<->daemon communication
862  */
863 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
864                                                  TALLOC_CTX *mem_ctx, 
865                                                  enum ctdb_operation operation, 
866                                                  size_t length, size_t slength,
867                                                  const char *type)
868 {
869         int size;
870         struct ctdb_req_header *hdr;
871
872         length = MAX(length, slength);
873         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
874
875         if (ctdb->methods == NULL) {
876                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
877                          operation, (unsigned)length));
878                 return NULL;
879         }
880
881         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
882         if (hdr == NULL) {
883                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
884                          operation, (unsigned)length));
885                 return NULL;
886         }
887         talloc_set_name_const(hdr, type);
888         memset(hdr, 0, slength);
889         hdr->length       = length;
890         hdr->operation    = operation;
891         hdr->ctdb_magic   = CTDB_MAGIC;
892         hdr->ctdb_version = CTDB_VERSION;
893         hdr->generation   = ctdb->vnn_map->generation;
894         hdr->srcnode      = ctdb->pnn;
895
896         return hdr;     
897 }
898
899 struct daemon_control_state {
900         struct daemon_control_state *next, *prev;
901         struct ctdb_client *client;
902         struct ctdb_req_control *c;
903         uint32_t reqid;
904         struct ctdb_node *node;
905 };
906
907 /*
908   callback when a control reply comes in
909  */
910 static void daemon_control_callback(struct ctdb_context *ctdb,
911                                     int32_t status, TDB_DATA data, 
912                                     const char *errormsg,
913                                     void *private_data)
914 {
915         struct daemon_control_state *state = talloc_get_type(private_data, 
916                                                              struct daemon_control_state);
917         struct ctdb_client *client = state->client;
918         struct ctdb_reply_control *r;
919         size_t len;
920         int ret;
921
922         /* construct a message to send to the client containing the data */
923         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
924         if (errormsg) {
925                 len += strlen(errormsg);
926         }
927         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
928                                struct ctdb_reply_control);
929         CTDB_NO_MEMORY_VOID(ctdb, r);
930
931         r->hdr.reqid     = state->reqid;
932         r->status        = status;
933         r->datalen       = data.dsize;
934         r->errorlen = 0;
935         memcpy(&r->data[0], data.dptr, data.dsize);
936         if (errormsg) {
937                 r->errorlen = strlen(errormsg);
938                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
939         }
940
941         ret = daemon_queue_send(client, &r->hdr);
942         if (ret != -1) {
943                 talloc_free(state);
944         }
945 }
946
947 /*
948   fail all pending controls to a disconnected node
949  */
950 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
951 {
952         struct daemon_control_state *state;
953         while ((state = node->pending_controls)) {
954                 DLIST_REMOVE(node->pending_controls, state);
955                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
956                                         "node is disconnected", state);
957         }
958 }
959
960 /*
961   destroy a daemon_control_state
962  */
963 static int daemon_control_destructor(struct daemon_control_state *state)
964 {
965         if (state->node) {
966                 DLIST_REMOVE(state->node->pending_controls, state);
967         }
968         return 0;
969 }
970
971 /*
972   this is called when the ctdb daemon received a ctdb request control
973   from a local client over the unix domain socket
974  */
975 static void daemon_request_control_from_client(struct ctdb_client *client, 
976                                                struct ctdb_req_control *c)
977 {
978         TDB_DATA data;
979         int res;
980         struct daemon_control_state *state;
981         TALLOC_CTX *tmp_ctx = talloc_new(client);
982
983         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
984                 c->hdr.destnode = client->ctdb->pnn;
985         }
986
987         state = talloc(client, struct daemon_control_state);
988         CTDB_NO_MEMORY_VOID(client->ctdb, state);
989
990         state->client = client;
991         state->c = talloc_steal(state, c);
992         state->reqid = c->hdr.reqid;
993         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
994                 state->node = client->ctdb->nodes[c->hdr.destnode];
995                 DLIST_ADD(state->node->pending_controls, state);
996         } else {
997                 state->node = NULL;
998         }
999
1000         talloc_set_destructor(state, daemon_control_destructor);
1001
1002         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1003                 talloc_steal(tmp_ctx, state);
1004         }
1005         
1006         data.dptr = &c->data[0];
1007         data.dsize = c->datalen;
1008         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1009                                        c->srvid, c->opcode, client->client_id,
1010                                        c->flags,
1011                                        data, daemon_control_callback,
1012                                        state);
1013         if (res != 0) {
1014                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1015                          c->hdr.destnode));
1016         }
1017
1018         talloc_free(tmp_ctx);
1019 }
1020
1021 /*
1022   register a call function
1023 */
1024 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1025                          ctdb_fn_t fn, int id)
1026 {
1027         struct ctdb_registered_call *call;
1028         struct ctdb_db_context *ctdb_db;
1029
1030         ctdb_db = find_ctdb_db(ctdb, db_id);
1031         if (ctdb_db == NULL) {
1032                 return -1;
1033         }
1034
1035         call = talloc(ctdb_db, struct ctdb_registered_call);
1036         call->fn = fn;
1037         call->id = id;
1038
1039         DLIST_ADD(ctdb_db->calls, call);        
1040         return 0;
1041 }
1042
1043
1044
1045 /*
1046   this local messaging handler is ugly, but is needed to prevent
1047   recursion in ctdb_send_message() when the destination node is the
1048   same as the source node
1049  */
1050 struct ctdb_local_message {
1051         struct ctdb_context *ctdb;
1052         uint64_t srvid;
1053         TDB_DATA data;
1054 };
1055
1056 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1057                                        struct timeval t, void *private_data)
1058 {
1059         struct ctdb_local_message *m = talloc_get_type(private_data, 
1060                                                        struct ctdb_local_message);
1061         int res;
1062
1063         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1064         if (res != 0) {
1065                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1066                           (unsigned long long)m->srvid));
1067         }
1068         talloc_free(m);
1069 }
1070
1071 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1072 {
1073         struct ctdb_local_message *m;
1074         m = talloc(ctdb, struct ctdb_local_message);
1075         CTDB_NO_MEMORY(ctdb, m);
1076
1077         m->ctdb = ctdb;
1078         m->srvid = srvid;
1079         m->data  = data;
1080         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1081         if (m->data.dptr == NULL) {
1082                 talloc_free(m);
1083                 return -1;
1084         }
1085
1086         /* this needs to be done as an event to prevent recursion */
1087         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1088         return 0;
1089 }
1090
1091 /*
1092   send a ctdb message
1093 */
1094 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1095                              uint64_t srvid, TDB_DATA data)
1096 {
1097         struct ctdb_req_message *r;
1098         int len;
1099
1100         if (ctdb->methods == NULL) {
1101                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1102                 return -1;
1103         }
1104
1105         /* see if this is a message to ourselves */
1106         if (pnn == ctdb->pnn) {
1107                 return ctdb_local_message(ctdb, srvid, data);
1108         }
1109
1110         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1111         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1112                                     struct ctdb_req_message);
1113         CTDB_NO_MEMORY(ctdb, r);
1114
1115         r->hdr.destnode  = pnn;
1116         r->srvid         = srvid;
1117         r->datalen       = data.dsize;
1118         memcpy(&r->data[0], data.dptr, data.dsize);
1119
1120         ctdb_queue_packet(ctdb, &r->hdr);
1121
1122         talloc_free(r);
1123         return 0;
1124 }
1125
1126
1127
1128 struct ctdb_client_notify_list {
1129         struct ctdb_client_notify_list *next, *prev;
1130         struct ctdb_context *ctdb;
1131         uint64_t srvid;
1132         TDB_DATA data;
1133 };
1134
1135
1136 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1137 {
1138         int ret;
1139
1140         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1141
1142         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1143         if (ret != 0) {
1144                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1145         }
1146
1147         return 0;
1148 }
1149
1150 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1151 {
1152         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1153         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1154         struct ctdb_client_notify_list *nl;
1155
1156         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1157
1158         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1159                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1160                 return -1;
1161         }
1162
1163         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1164                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1165                 return -1;
1166         }
1167
1168
1169         if (client == NULL) {
1170                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1171                 return -1;
1172         }
1173
1174         for(nl=client->notify; nl; nl=nl->next) {
1175                 if (nl->srvid == notify->srvid) {
1176                         break;
1177                 }
1178         }
1179         if (nl != NULL) {
1180                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1181                 return -1;
1182         }
1183
1184         nl = talloc(client, struct ctdb_client_notify_list);
1185         CTDB_NO_MEMORY(ctdb, nl);
1186         nl->ctdb       = ctdb;
1187         nl->srvid      = notify->srvid;
1188         nl->data.dsize = notify->len;
1189         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1190         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1191         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1192         
1193         DLIST_ADD(client->notify, nl);
1194         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1195
1196         return 0;
1197 }
1198
1199 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1200 {
1201         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1202         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1203         struct ctdb_client_notify_list *nl;
1204
1205         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1206
1207         if (client == NULL) {
1208                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1209                 return -1;
1210         }
1211
1212         for(nl=client->notify; nl; nl=nl->next) {
1213                 if (nl->srvid == notify->srvid) {
1214                         break;
1215                 }
1216         }
1217         if (nl == NULL) {
1218                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1219                 return -1;
1220         }
1221
1222         DLIST_REMOVE(client->notify, nl);
1223         talloc_set_destructor(nl, NULL);
1224         talloc_free(nl);
1225
1226         return 0;
1227 }
1228
1229 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1230 {
1231         struct ctdb_client_pid_list *client_pid;
1232
1233         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1234                 if (client_pid->pid == pid) {
1235                         return client_pid->client;
1236                 }
1237         }
1238         return NULL;
1239 }
1240
1241
1242 /* This control is used by samba when probing if a process (of a samba daemon)
1243    exists on the node.
1244    Samba does this when it needs/wants to check if a subrecord in one of the
1245    databases is still valied, or if it is stale and can be removed.
1246    If the node is in unhealthy or stopped state we just kill of the samba
1247    process holding htis sub-record and return to the calling samba that
1248    the process does not exist.
1249    This allows us to forcefully recall subrecords registered by samba processes
1250    on banned and stopped nodes.
1251 */
1252 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1253 {
1254         struct ctdb_client *client;
1255
1256         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1257                 client = ctdb_find_client_by_pid(ctdb, pid);
1258                 if (client != NULL) {
1259                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1260                         talloc_free(client);
1261                 }
1262                 return -1;
1263         }
1264
1265         return kill(pid, 0);
1266 }