set up a handler to catch and log debug messages from the tevent layer
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res == -1) {
285                 /* client is dead - return immediately */
286                 return;
287         }
288         if (res != 0) {
289                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
290         }
291         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
292         talloc_free(dstate);
293         if (client->ctdb->statistics.pending_calls > 0) {
294                 client->ctdb->statistics.pending_calls--;
295         }
296 }
297
298 struct ctdb_daemon_packet_wrap {
299         struct ctdb_context *ctdb;
300         uint32_t client_id;
301 };
302
303 /*
304   a wrapper to catch disconnected clients
305  */
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
307 {
308         struct ctdb_client *client;
309         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
310                                                             struct ctdb_daemon_packet_wrap);
311         if (w == NULL) {
312                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
313                 return;
314         }
315
316         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317         if (client == NULL) {
318                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319                          w->client_id));
320                 talloc_free(w);
321                 return;
322         }
323         talloc_free(w);
324
325         /* process it */
326         daemon_incoming_packet(client, hdr);    
327 }
328
329
330 /*
331   this is called when the ctdb daemon received a ctdb request call
332   from a local client over the unix domain socket
333  */
334 static void daemon_request_call_from_client(struct ctdb_client *client, 
335                                             struct ctdb_req_call *c)
336 {
337         struct ctdb_call_state *state;
338         struct ctdb_db_context *ctdb_db;
339         struct daemon_call_state *dstate;
340         struct ctdb_call *call;
341         struct ctdb_ltdb_header header;
342         TDB_DATA key, data;
343         int ret;
344         struct ctdb_context *ctdb = client->ctdb;
345         struct ctdb_daemon_packet_wrap *w;
346
347         ctdb->statistics.total_calls++;
348         if (client->ctdb->statistics.pending_calls > 0) {
349                 ctdb->statistics.pending_calls++;
350         }
351
352         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
353         if (!ctdb_db) {
354                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
355                           c->db_id));
356                 if (client->ctdb->statistics.pending_calls > 0) {
357                         ctdb->statistics.pending_calls--;
358                 }
359                 return;
360         }
361
362         if (ctdb_db->unhealthy_reason) {
363                 /*
364                  * this is just a warning, as the tdb should be empty anyway,
365                  * and only persistent databases can be unhealthy, which doesn't
366                  * use this code patch
367                  */
368                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
370         }
371
372         key.dptr = c->data;
373         key.dsize = c->keylen;
374
375         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376         CTDB_NO_MEMORY_VOID(ctdb, w);   
377
378         w->ctdb = ctdb;
379         w->client_id = client->client_id;
380
381         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
382                                            (struct ctdb_req_header *)c, &data,
383                                            daemon_incoming_packet_wrap, w, True);
384         if (ret == -2) {
385                 /* will retry later */
386                 if (client->ctdb->statistics.pending_calls > 0) {
387                         ctdb->statistics.pending_calls--;
388                 }
389                 return;
390         }
391
392         talloc_free(w);
393
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396                 if (client->ctdb->statistics.pending_calls > 0) {
397                         ctdb->statistics.pending_calls--;
398                 }
399                 return;
400         }
401
402         dstate = talloc(client, struct daemon_call_state);
403         if (dstate == NULL) {
404                 ret = ctdb_ltdb_unlock(ctdb_db, key);
405                 if (ret != 0) {
406                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
407                 }
408
409                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
410                 if (client->ctdb->statistics.pending_calls > 0) {
411                         ctdb->statistics.pending_calls--;
412                 }
413                 return;
414         }
415         dstate->start_time = timeval_current();
416         dstate->client = client;
417         dstate->reqid  = c->hdr.reqid;
418         talloc_steal(dstate, data.dptr);
419
420         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
421         if (call == NULL) {
422                 ret = ctdb_ltdb_unlock(ctdb_db, key);
423                 if (ret != 0) {
424                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
425                 }
426
427                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
428                 if (client->ctdb->statistics.pending_calls > 0) {
429                         ctdb->statistics.pending_calls--;
430                 }
431                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
432                 return;
433         }
434
435         call->call_id = c->callid;
436         call->key = key;
437         call->call_data.dptr = c->data + c->keylen;
438         call->call_data.dsize = c->calldatalen;
439         call->flags = c->flags;
440
441         if (header.dmaster == ctdb->pnn) {
442                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
443         } else {
444                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
445         }
446
447         ret = ctdb_ltdb_unlock(ctdb_db, key);
448         if (ret != 0) {
449                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
450         }
451
452         if (state == NULL) {
453                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
454                 if (client->ctdb->statistics.pending_calls > 0) {
455                         ctdb->statistics.pending_calls--;
456                 }
457                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
458                 return;
459         }
460         talloc_steal(state, dstate);
461         talloc_steal(client, state);
462
463         state->async.fn = daemon_call_from_client_callback;
464         state->async.private_data = dstate;
465 }
466
467
468 static void daemon_request_control_from_client(struct ctdb_client *client, 
469                                                struct ctdb_req_control *c);
470
471 /* data contains a packet from the client */
472 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
473 {
474         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
475         TALLOC_CTX *tmp_ctx;
476         struct ctdb_context *ctdb = client->ctdb;
477
478         /* place the packet as a child of a tmp_ctx. We then use
479            talloc_free() below to free it. If any of the calls want
480            to keep it, then they will steal it somewhere else, and the
481            talloc_free() will be a no-op */
482         tmp_ctx = talloc_new(client);
483         talloc_steal(tmp_ctx, hdr);
484
485         if (hdr->ctdb_magic != CTDB_MAGIC) {
486                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
487                 goto done;
488         }
489
490         if (hdr->ctdb_version != CTDB_VERSION) {
491                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
492                 goto done;
493         }
494
495         switch (hdr->operation) {
496         case CTDB_REQ_CALL:
497                 ctdb->statistics.client.req_call++;
498                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
499                 break;
500
501         case CTDB_REQ_MESSAGE:
502                 ctdb->statistics.client.req_message++;
503                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
504                 break;
505
506         case CTDB_REQ_CONTROL:
507                 ctdb->statistics.client.req_control++;
508                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
509                 break;
510
511         default:
512                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
513                          hdr->operation));
514         }
515
516 done:
517         talloc_free(tmp_ctx);
518 }
519
520 /*
521   called when the daemon gets a incoming packet
522  */
523 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
524 {
525         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
526         struct ctdb_req_header *hdr;
527
528         if (cnt == 0) {
529                 talloc_free(client);
530                 return;
531         }
532
533         client->ctdb->statistics.client_packets_recv++;
534
535         if (cnt < sizeof(*hdr)) {
536                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
537                                (unsigned)cnt);
538                 return;
539         }
540         hdr = (struct ctdb_req_header *)data;
541         if (cnt != hdr->length) {
542                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
543                                (unsigned)hdr->length, (unsigned)cnt);
544                 return;
545         }
546
547         if (hdr->ctdb_magic != CTDB_MAGIC) {
548                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
549                 return;
550         }
551
552         if (hdr->ctdb_version != CTDB_VERSION) {
553                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
554                 return;
555         }
556
557         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
558                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
559                  hdr->srcnode, hdr->destnode));
560
561         /* it is the responsibility of the incoming packet function to free 'data' */
562         daemon_incoming_packet(client, hdr);
563 }
564
565
566 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
567 {
568         if (client_pid->ctdb->client_pids != NULL) {
569                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
570         }
571
572         return 0;
573 }
574
575
576 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
577                          uint16_t flags, void *private_data)
578 {
579         struct sockaddr_un addr;
580         socklen_t len;
581         int fd;
582         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
583         struct ctdb_client *client;
584         struct ctdb_client_pid_list *client_pid;
585 #ifdef _AIX
586         struct peercred_struct cr;
587         socklen_t crl = sizeof(struct peercred_struct);
588 #else
589         struct ucred cr;
590         socklen_t crl = sizeof(struct ucred);
591 #endif
592
593         memset(&addr, 0, sizeof(addr));
594         len = sizeof(addr);
595         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
596         if (fd == -1) {
597                 return;
598         }
599
600         set_nonblocking(fd);
601         set_close_on_exec(fd);
602
603         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
604
605         client = talloc_zero(ctdb, struct ctdb_client);
606 #ifdef _AIX
607         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
608 #else
609         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
610 #endif
611                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
612         }
613
614         client->ctdb = ctdb;
615         client->fd = fd;
616         client->client_id = ctdb_reqid_new(ctdb, client);
617         client->pid = cr.pid;
618
619         client_pid = talloc(client, struct ctdb_client_pid_list);
620         if (client_pid == NULL) {
621                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
622                 close(fd);
623                 talloc_free(client);
624                 return;
625         }               
626         client_pid->ctdb   = ctdb;
627         client_pid->pid    = cr.pid;
628         client_pid->client = client;
629
630         DLIST_ADD(ctdb->client_pids, client_pid);
631
632         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
633                                          ctdb_daemon_read_cb, client,
634                                          "client-%u", client->pid);
635
636         talloc_set_destructor(client, ctdb_client_destructor);
637         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
638         ctdb->statistics.num_clients++;
639 }
640
641
642
643 /*
644   create a unix domain socket and bind it
645   return a file descriptor open on the socket 
646 */
647 static int ux_socket_bind(struct ctdb_context *ctdb)
648 {
649         struct sockaddr_un addr;
650
651         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
652         if (ctdb->daemon.sd == -1) {
653                 return -1;
654         }
655
656         set_close_on_exec(ctdb->daemon.sd);
657         set_nonblocking(ctdb->daemon.sd);
658
659         memset(&addr, 0, sizeof(addr));
660         addr.sun_family = AF_UNIX;
661         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
662
663         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
664                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
665                 goto failed;
666         }       
667
668         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
669             chmod(ctdb->daemon.name, 0700) != 0) {
670                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
671                 goto failed;
672         } 
673
674
675         if (listen(ctdb->daemon.sd, 100) != 0) {
676                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
677                 goto failed;
678         }
679
680         return 0;
681
682 failed:
683         close(ctdb->daemon.sd);
684         ctdb->daemon.sd = -1;
685         return -1;      
686 }
687
688 static void sig_child_handler(struct event_context *ev,
689         struct signal_event *se, int signum, int count,
690         void *dont_care, 
691         void *private_data)
692 {
693 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
694         int status;
695         pid_t pid = -1;
696
697         while (pid != 0) {
698                 pid = waitpid(-1, &status, WNOHANG);
699                 if (pid == -1) {
700                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
701                         return;
702                 }
703                 if (pid > 0) {
704                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
705                 }
706         }
707 }
708
709 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
710                                       void *private_data)
711 {
712         if (status != 0) {
713                 ctdb_fatal(ctdb, "Failed to run setup event\n");
714                 return;
715         }
716         ctdb_run_notification_script(ctdb, "setup");
717
718         /* tell all other nodes we've just started up */
719         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
720                                  0, CTDB_CONTROL_STARTUP, 0,
721                                  CTDB_CTRL_FLAG_NOREPLY,
722                                  tdb_null, NULL, NULL);
723 }
724
725 /*
726   start the protocol going as a daemon
727 */
728 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
729 {
730         int res, ret = -1;
731         struct fd_event *fde;
732         const char *domain_socket_name;
733         struct signal_event *se;
734
735         /* get rid of any old sockets */
736         unlink(ctdb->daemon.name);
737
738         /* create a unix domain stream socket to listen to */
739         res = ux_socket_bind(ctdb);
740         if (res!=0) {
741                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
742                 exit(10);
743         }
744
745         if (do_fork && fork()) {
746                 return 0;
747         }
748
749         tdb_reopen_all(False);
750
751         if (do_fork) {
752                 setsid();
753                 close(0);
754                 if (open("/dev/null", O_RDONLY) != 0) {
755                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
756                         exit(11);
757                 }
758         }
759         block_signal(SIGPIPE);
760
761         ctdbd_pid = getpid();
762
763
764         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
765
766         ctdb_high_priority(ctdb);
767
768         /* ensure the socket is deleted on exit of the daemon */
769         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
770         if (domain_socket_name == NULL) {
771                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
772                 exit(12);
773         }
774
775         ctdb->ev = event_context_init(NULL);
776         tevent_loop_allow_nesting(ctdb->ev);
777         ret = ctdb_init_tevent_logging(ctdb);
778         if (ret != 0) {
779                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
780                 exit(1);
781         }
782
783         ctdb_set_child_logging(ctdb);
784
785         /* force initial recovery for election */
786         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
787
788         if (strcmp(ctdb->transport, "tcp") == 0) {
789                 int ctdb_tcp_init(struct ctdb_context *);
790                 ret = ctdb_tcp_init(ctdb);
791         }
792 #ifdef USE_INFINIBAND
793         if (strcmp(ctdb->transport, "ib") == 0) {
794                 int ctdb_ibw_init(struct ctdb_context *);
795                 ret = ctdb_ibw_init(ctdb);
796         }
797 #endif
798         if (ret != 0) {
799                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
800                 return -1;
801         }
802
803         if (ctdb->methods == NULL) {
804                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
805                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
806         }
807
808         /* initialise the transport  */
809         if (ctdb->methods->initialise(ctdb) != 0) {
810                 ctdb_fatal(ctdb, "transport failed to initialise");
811         }
812
813         /* attach to existing databases */
814         if (ctdb_attach_databases(ctdb) != 0) {
815                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
816         }
817
818         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
819         if (ret != 0) {
820                 ctdb_fatal(ctdb, "Failed to run init event\n");
821         }
822         ctdb_run_notification_script(ctdb, "init");
823
824         /* start frozen, then let the first election sort things out */
825         if (ctdb_blocking_freeze(ctdb)) {
826                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
827         }
828
829         /* now start accepting clients, only can do this once frozen */
830         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
831                            EVENT_FD_READ,
832                            ctdb_accept_client, ctdb);
833         tevent_fd_set_auto_close(fde);
834
835         /* release any IPs we hold from previous runs of the daemon */
836         ctdb_release_all_ips(ctdb);
837
838         /* start the transport going */
839         ctdb_start_transport(ctdb);
840
841         /* set up a handler to pick up sigchld */
842         se = event_add_signal(ctdb->ev, ctdb,
843                                      SIGCHLD, 0,
844                                      sig_child_handler,
845                                      ctdb);
846         if (se == NULL) {
847                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
848                 exit(1);
849         }
850
851         ret = ctdb_event_script_callback(ctdb,
852                                          ctdb,
853                                          ctdb_setup_event_callback,
854                                          ctdb,
855                                          false,
856                                          CTDB_EVENT_SETUP,
857                                          "");
858         if (ret != 0) {
859                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
860                 exit(1);
861         }
862
863         if (use_syslog) {
864                 if (start_syslog_daemon(ctdb)) {
865                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
866                         exit(10);
867                 }
868         }
869
870         ctdb_lockdown_memory(ctdb);
871           
872         /* go into a wait loop to allow other nodes to complete */
873         event_loop_wait(ctdb->ev);
874
875         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
876         exit(1);
877 }
878
879 /*
880   allocate a packet for use in daemon<->daemon communication
881  */
882 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
883                                                  TALLOC_CTX *mem_ctx, 
884                                                  enum ctdb_operation operation, 
885                                                  size_t length, size_t slength,
886                                                  const char *type)
887 {
888         int size;
889         struct ctdb_req_header *hdr;
890
891         length = MAX(length, slength);
892         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
893
894         if (ctdb->methods == NULL) {
895                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
896                          operation, (unsigned)length));
897                 return NULL;
898         }
899
900         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
901         if (hdr == NULL) {
902                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
903                          operation, (unsigned)length));
904                 return NULL;
905         }
906         talloc_set_name_const(hdr, type);
907         memset(hdr, 0, slength);
908         hdr->length       = length;
909         hdr->operation    = operation;
910         hdr->ctdb_magic   = CTDB_MAGIC;
911         hdr->ctdb_version = CTDB_VERSION;
912         hdr->generation   = ctdb->vnn_map->generation;
913         hdr->srcnode      = ctdb->pnn;
914
915         return hdr;     
916 }
917
918 struct daemon_control_state {
919         struct daemon_control_state *next, *prev;
920         struct ctdb_client *client;
921         struct ctdb_req_control *c;
922         uint32_t reqid;
923         struct ctdb_node *node;
924 };
925
926 /*
927   callback when a control reply comes in
928  */
929 static void daemon_control_callback(struct ctdb_context *ctdb,
930                                     int32_t status, TDB_DATA data, 
931                                     const char *errormsg,
932                                     void *private_data)
933 {
934         struct daemon_control_state *state = talloc_get_type(private_data, 
935                                                              struct daemon_control_state);
936         struct ctdb_client *client = state->client;
937         struct ctdb_reply_control *r;
938         size_t len;
939         int ret;
940
941         /* construct a message to send to the client containing the data */
942         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
943         if (errormsg) {
944                 len += strlen(errormsg);
945         }
946         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
947                                struct ctdb_reply_control);
948         CTDB_NO_MEMORY_VOID(ctdb, r);
949
950         r->hdr.reqid     = state->reqid;
951         r->status        = status;
952         r->datalen       = data.dsize;
953         r->errorlen = 0;
954         memcpy(&r->data[0], data.dptr, data.dsize);
955         if (errormsg) {
956                 r->errorlen = strlen(errormsg);
957                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
958         }
959
960         ret = daemon_queue_send(client, &r->hdr);
961         if (ret != -1) {
962                 talloc_free(state);
963         }
964 }
965
966 /*
967   fail all pending controls to a disconnected node
968  */
969 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
970 {
971         struct daemon_control_state *state;
972         while ((state = node->pending_controls)) {
973                 DLIST_REMOVE(node->pending_controls, state);
974                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
975                                         "node is disconnected", state);
976         }
977 }
978
979 /*
980   destroy a daemon_control_state
981  */
982 static int daemon_control_destructor(struct daemon_control_state *state)
983 {
984         if (state->node) {
985                 DLIST_REMOVE(state->node->pending_controls, state);
986         }
987         return 0;
988 }
989
990 /*
991   this is called when the ctdb daemon received a ctdb request control
992   from a local client over the unix domain socket
993  */
994 static void daemon_request_control_from_client(struct ctdb_client *client, 
995                                                struct ctdb_req_control *c)
996 {
997         TDB_DATA data;
998         int res;
999         struct daemon_control_state *state;
1000         TALLOC_CTX *tmp_ctx = talloc_new(client);
1001
1002         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1003                 c->hdr.destnode = client->ctdb->pnn;
1004         }
1005
1006         state = talloc(client, struct daemon_control_state);
1007         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1008
1009         state->client = client;
1010         state->c = talloc_steal(state, c);
1011         state->reqid = c->hdr.reqid;
1012         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1013                 state->node = client->ctdb->nodes[c->hdr.destnode];
1014                 DLIST_ADD(state->node->pending_controls, state);
1015         } else {
1016                 state->node = NULL;
1017         }
1018
1019         talloc_set_destructor(state, daemon_control_destructor);
1020
1021         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1022                 talloc_steal(tmp_ctx, state);
1023         }
1024         
1025         data.dptr = &c->data[0];
1026         data.dsize = c->datalen;
1027         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1028                                        c->srvid, c->opcode, client->client_id,
1029                                        c->flags,
1030                                        data, daemon_control_callback,
1031                                        state);
1032         if (res != 0) {
1033                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1034                          c->hdr.destnode));
1035         }
1036
1037         talloc_free(tmp_ctx);
1038 }
1039
1040 /*
1041   register a call function
1042 */
1043 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1044                          ctdb_fn_t fn, int id)
1045 {
1046         struct ctdb_registered_call *call;
1047         struct ctdb_db_context *ctdb_db;
1048
1049         ctdb_db = find_ctdb_db(ctdb, db_id);
1050         if (ctdb_db == NULL) {
1051                 return -1;
1052         }
1053
1054         call = talloc(ctdb_db, struct ctdb_registered_call);
1055         call->fn = fn;
1056         call->id = id;
1057
1058         DLIST_ADD(ctdb_db->calls, call);        
1059         return 0;
1060 }
1061
1062
1063
1064 /*
1065   this local messaging handler is ugly, but is needed to prevent
1066   recursion in ctdb_send_message() when the destination node is the
1067   same as the source node
1068  */
1069 struct ctdb_local_message {
1070         struct ctdb_context *ctdb;
1071         uint64_t srvid;
1072         TDB_DATA data;
1073 };
1074
1075 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1076                                        struct timeval t, void *private_data)
1077 {
1078         struct ctdb_local_message *m = talloc_get_type(private_data, 
1079                                                        struct ctdb_local_message);
1080         int res;
1081
1082         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1083         if (res != 0) {
1084                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1085                           (unsigned long long)m->srvid));
1086         }
1087         talloc_free(m);
1088 }
1089
1090 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1091 {
1092         struct ctdb_local_message *m;
1093         m = talloc(ctdb, struct ctdb_local_message);
1094         CTDB_NO_MEMORY(ctdb, m);
1095
1096         m->ctdb = ctdb;
1097         m->srvid = srvid;
1098         m->data  = data;
1099         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1100         if (m->data.dptr == NULL) {
1101                 talloc_free(m);
1102                 return -1;
1103         }
1104
1105         /* this needs to be done as an event to prevent recursion */
1106         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1107         return 0;
1108 }
1109
1110 /*
1111   send a ctdb message
1112 */
1113 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1114                              uint64_t srvid, TDB_DATA data)
1115 {
1116         struct ctdb_req_message *r;
1117         int len;
1118
1119         if (ctdb->methods == NULL) {
1120                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1121                 return -1;
1122         }
1123
1124         /* see if this is a message to ourselves */
1125         if (pnn == ctdb->pnn) {
1126                 return ctdb_local_message(ctdb, srvid, data);
1127         }
1128
1129         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1130         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1131                                     struct ctdb_req_message);
1132         CTDB_NO_MEMORY(ctdb, r);
1133
1134         r->hdr.destnode  = pnn;
1135         r->srvid         = srvid;
1136         r->datalen       = data.dsize;
1137         memcpy(&r->data[0], data.dptr, data.dsize);
1138
1139         ctdb_queue_packet(ctdb, &r->hdr);
1140
1141         talloc_free(r);
1142         return 0;
1143 }
1144
1145
1146
1147 struct ctdb_client_notify_list {
1148         struct ctdb_client_notify_list *next, *prev;
1149         struct ctdb_context *ctdb;
1150         uint64_t srvid;
1151         TDB_DATA data;
1152 };
1153
1154
1155 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1156 {
1157         int ret;
1158
1159         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1160
1161         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1162         if (ret != 0) {
1163                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1164         }
1165
1166         return 0;
1167 }
1168
1169 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1170 {
1171         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1172         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1173         struct ctdb_client_notify_list *nl;
1174
1175         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1176
1177         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1178                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1179                 return -1;
1180         }
1181
1182         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1183                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1184                 return -1;
1185         }
1186
1187
1188         if (client == NULL) {
1189                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1190                 return -1;
1191         }
1192
1193         for(nl=client->notify; nl; nl=nl->next) {
1194                 if (nl->srvid == notify->srvid) {
1195                         break;
1196                 }
1197         }
1198         if (nl != NULL) {
1199                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1200                 return -1;
1201         }
1202
1203         nl = talloc(client, struct ctdb_client_notify_list);
1204         CTDB_NO_MEMORY(ctdb, nl);
1205         nl->ctdb       = ctdb;
1206         nl->srvid      = notify->srvid;
1207         nl->data.dsize = notify->len;
1208         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1209         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1210         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1211         
1212         DLIST_ADD(client->notify, nl);
1213         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1214
1215         return 0;
1216 }
1217
1218 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1219 {
1220         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1221         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1222         struct ctdb_client_notify_list *nl;
1223
1224         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1225
1226         if (client == NULL) {
1227                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1228                 return -1;
1229         }
1230
1231         for(nl=client->notify; nl; nl=nl->next) {
1232                 if (nl->srvid == notify->srvid) {
1233                         break;
1234                 }
1235         }
1236         if (nl == NULL) {
1237                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1238                 return -1;
1239         }
1240
1241         DLIST_REMOVE(client->notify, nl);
1242         talloc_set_destructor(nl, NULL);
1243         talloc_free(nl);
1244
1245         return 0;
1246 }
1247
1248 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1249 {
1250         struct ctdb_client_pid_list *client_pid;
1251
1252         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1253                 if (client_pid->pid == pid) {
1254                         return client_pid->client;
1255                 }
1256         }
1257         return NULL;
1258 }
1259
1260
1261 /* This control is used by samba when probing if a process (of a samba daemon)
1262    exists on the node.
1263    Samba does this when it needs/wants to check if a subrecord in one of the
1264    databases is still valied, or if it is stale and can be removed.
1265    If the node is in unhealthy or stopped state we just kill of the samba
1266    process holding htis sub-record and return to the calling samba that
1267    the process does not exist.
1268    This allows us to forcefully recall subrecords registered by samba processes
1269    on banned and stopped nodes.
1270 */
1271 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1272 {
1273         struct ctdb_client *client;
1274
1275         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1276                 client = ctdb_find_client_by_pid(ctdb, pid);
1277                 if (client != NULL) {
1278                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1279                         talloc_free(client);
1280                 }
1281                 return -1;
1282         }
1283
1284         return kill(pid, 0);
1285 }