Merge branch 'master' of git://git.samba.org/sahlberg/ctdb
[metze/ctdb/wip.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         if (client->ctdb->statistics.num_clients) {
188                 client->ctdb->statistics.num_clients--;
189         }
190
191         if (client->num_persistent_updates != 0) {
192                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
193                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
194         }
195         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
196         if (ctdb_db) {
197                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
198                                   "commit active. Forcing recovery.\n"));
199                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
200                 ctdb_db->transaction_active = false;
201         }
202
203         return 0;
204 }
205
206
207 /*
208   this is called when the ctdb daemon received a ctdb request message
209   from a local client over the unix domain socket
210  */
211 static void daemon_request_message_from_client(struct ctdb_client *client, 
212                                                struct ctdb_req_message *c)
213 {
214         TDB_DATA data;
215         int res;
216
217         /* maybe the message is for another client on this node */
218         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
219                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
220                 return;
221         }
222
223         /* its for a remote node */
224         data.dptr = &c->data[0];
225         data.dsize = c->datalen;
226         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
227                                        c->srvid, data);
228         if (res != 0) {
229                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
230                          c->hdr.destnode));
231         }
232 }
233
234
235 struct daemon_call_state {
236         struct ctdb_client *client;
237         uint32_t reqid;
238         struct ctdb_call *call;
239         struct timeval start_time;
240 };
241
242 /* 
243    complete a call from a client 
244 */
245 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
246 {
247         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
248                                                            struct daemon_call_state);
249         struct ctdb_reply_call *r;
250         int res;
251         uint32_t length;
252         struct ctdb_client *client = dstate->client;
253         struct ctdb_db_context *ctdb_db = state->ctdb_db;
254
255         talloc_steal(client, dstate);
256         talloc_steal(dstate, dstate->call);
257
258         res = ctdb_daemon_call_recv(state, dstate->call);
259         if (res != 0) {
260                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
261                 if (client->ctdb->statistics.pending_calls > 0) {
262                         client->ctdb->statistics.pending_calls--;
263                 }
264                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
265                 return;
266         }
267
268         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
269         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
270                                length, struct ctdb_reply_call);
271         if (r == NULL) {
272                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
273                 if (client->ctdb->statistics.pending_calls > 0) {
274                         client->ctdb->statistics.pending_calls--;
275                 }
276                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
277                 return;
278         }
279         r->hdr.reqid        = dstate->reqid;
280         r->datalen          = dstate->call->reply_data.dsize;
281         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
282
283         res = daemon_queue_send(client, &r->hdr);
284         if (res == -1) {
285                 /* client is dead - return immediately */
286                 return;
287         }
288         if (res != 0) {
289                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
290         }
291         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
292         talloc_free(dstate);
293         if (client->ctdb->statistics.pending_calls > 0) {
294                 client->ctdb->statistics.pending_calls--;
295         }
296 }
297
298 struct ctdb_daemon_packet_wrap {
299         struct ctdb_context *ctdb;
300         uint32_t client_id;
301 };
302
303 /*
304   a wrapper to catch disconnected clients
305  */
306 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
307 {
308         struct ctdb_client *client;
309         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
310                                                             struct ctdb_daemon_packet_wrap);
311         if (w == NULL) {
312                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
313                 return;
314         }
315
316         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
317         if (client == NULL) {
318                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
319                          w->client_id));
320                 talloc_free(w);
321                 return;
322         }
323         talloc_free(w);
324
325         /* process it */
326         daemon_incoming_packet(client, hdr);    
327 }
328
329
330 /*
331   this is called when the ctdb daemon received a ctdb request call
332   from a local client over the unix domain socket
333  */
334 static void daemon_request_call_from_client(struct ctdb_client *client, 
335                                             struct ctdb_req_call *c)
336 {
337         struct ctdb_call_state *state;
338         struct ctdb_db_context *ctdb_db;
339         struct daemon_call_state *dstate;
340         struct ctdb_call *call;
341         struct ctdb_ltdb_header header;
342         TDB_DATA key, data;
343         int ret;
344         struct ctdb_context *ctdb = client->ctdb;
345         struct ctdb_daemon_packet_wrap *w;
346
347         ctdb->statistics.total_calls++;
348         if (client->ctdb->statistics.pending_calls > 0) {
349                 ctdb->statistics.pending_calls++;
350         }
351
352         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
353         if (!ctdb_db) {
354                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
355                           c->db_id));
356                 if (client->ctdb->statistics.pending_calls > 0) {
357                         ctdb->statistics.pending_calls--;
358                 }
359                 return;
360         }
361
362         if (ctdb_db->unhealthy_reason) {
363                 /*
364                  * this is just a warning, as the tdb should be empty anyway,
365                  * and only persistent databases can be unhealthy, which doesn't
366                  * use this code patch
367                  */
368                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
369                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
370         }
371
372         key.dptr = c->data;
373         key.dsize = c->keylen;
374
375         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
376         CTDB_NO_MEMORY_VOID(ctdb, w);   
377
378         w->ctdb = ctdb;
379         w->client_id = client->client_id;
380
381         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
382                                            (struct ctdb_req_header *)c, &data,
383                                            daemon_incoming_packet_wrap, w, True);
384         if (ret == -2) {
385                 /* will retry later */
386                 if (client->ctdb->statistics.pending_calls > 0) {
387                         ctdb->statistics.pending_calls--;
388                 }
389                 return;
390         }
391
392         talloc_free(w);
393
394         if (ret != 0) {
395                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
396                 if (client->ctdb->statistics.pending_calls > 0) {
397                         ctdb->statistics.pending_calls--;
398                 }
399                 return;
400         }
401
402         dstate = talloc(client, struct daemon_call_state);
403         if (dstate == NULL) {
404                 ctdb_ltdb_unlock(ctdb_db, key);
405                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
406                 if (client->ctdb->statistics.pending_calls > 0) {
407                         ctdb->statistics.pending_calls--;
408                 }
409                 return;
410         }
411         dstate->start_time = timeval_current();
412         dstate->client = client;
413         dstate->reqid  = c->hdr.reqid;
414         talloc_steal(dstate, data.dptr);
415
416         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
417         if (call == NULL) {
418                 ctdb_ltdb_unlock(ctdb_db, key);
419                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
420                 if (client->ctdb->statistics.pending_calls > 0) {
421                         ctdb->statistics.pending_calls--;
422                 }
423                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
424                 return;
425         }
426
427         call->call_id = c->callid;
428         call->key = key;
429         call->call_data.dptr = c->data + c->keylen;
430         call->call_data.dsize = c->calldatalen;
431         call->flags = c->flags;
432
433         if (header.dmaster == ctdb->pnn) {
434                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
435         } else {
436                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
437         }
438
439         ctdb_ltdb_unlock(ctdb_db, key);
440
441         if (state == NULL) {
442                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
443                 if (client->ctdb->statistics.pending_calls > 0) {
444                         ctdb->statistics.pending_calls--;
445                 }
446                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
447                 return;
448         }
449         talloc_steal(state, dstate);
450         talloc_steal(client, state);
451
452         state->async.fn = daemon_call_from_client_callback;
453         state->async.private_data = dstate;
454 }
455
456
457 static void daemon_request_control_from_client(struct ctdb_client *client, 
458                                                struct ctdb_req_control *c);
459
460 /* data contains a packet from the client */
461 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
462 {
463         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
464         TALLOC_CTX *tmp_ctx;
465         struct ctdb_context *ctdb = client->ctdb;
466
467         /* place the packet as a child of a tmp_ctx. We then use
468            talloc_free() below to free it. If any of the calls want
469            to keep it, then they will steal it somewhere else, and the
470            talloc_free() will be a no-op */
471         tmp_ctx = talloc_new(client);
472         talloc_steal(tmp_ctx, hdr);
473
474         if (hdr->ctdb_magic != CTDB_MAGIC) {
475                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
476                 goto done;
477         }
478
479         if (hdr->ctdb_version != CTDB_VERSION) {
480                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
481                 goto done;
482         }
483
484         switch (hdr->operation) {
485         case CTDB_REQ_CALL:
486                 ctdb->statistics.client.req_call++;
487                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
488                 break;
489
490         case CTDB_REQ_MESSAGE:
491                 ctdb->statistics.client.req_message++;
492                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
493                 break;
494
495         case CTDB_REQ_CONTROL:
496                 ctdb->statistics.client.req_control++;
497                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
498                 break;
499
500         default:
501                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
502                          hdr->operation));
503         }
504
505 done:
506         talloc_free(tmp_ctx);
507 }
508
509 /*
510   called when the daemon gets a incoming packet
511  */
512 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
513 {
514         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
515         struct ctdb_req_header *hdr;
516
517         if (cnt == 0) {
518                 talloc_free(client);
519                 return;
520         }
521
522         client->ctdb->statistics.client_packets_recv++;
523
524         if (cnt < sizeof(*hdr)) {
525                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
526                                (unsigned)cnt);
527                 return;
528         }
529         hdr = (struct ctdb_req_header *)data;
530         if (cnt != hdr->length) {
531                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
532                                (unsigned)hdr->length, (unsigned)cnt);
533                 return;
534         }
535
536         if (hdr->ctdb_magic != CTDB_MAGIC) {
537                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
538                 return;
539         }
540
541         if (hdr->ctdb_version != CTDB_VERSION) {
542                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
543                 return;
544         }
545
546         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
547                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
548                  hdr->srcnode, hdr->destnode));
549
550         /* it is the responsibility of the incoming packet function to free 'data' */
551         daemon_incoming_packet(client, hdr);
552 }
553
554
555 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
556 {
557         if (client_pid->ctdb->client_pids != NULL) {
558                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
559         }
560
561         return 0;
562 }
563
564
565 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
566                          uint16_t flags, void *private_data)
567 {
568         struct sockaddr_un addr;
569         socklen_t len;
570         int fd;
571         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
572         struct ctdb_client *client;
573         struct ctdb_client_pid_list *client_pid;
574 #ifdef _AIX
575         struct peercred_struct cr;
576         socklen_t crl = sizeof(struct peercred_struct);
577 #else
578         struct ucred cr;
579         socklen_t crl = sizeof(struct ucred);
580 #endif
581
582         memset(&addr, 0, sizeof(addr));
583         len = sizeof(addr);
584         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
585         if (fd == -1) {
586                 return;
587         }
588
589         set_nonblocking(fd);
590         set_close_on_exec(fd);
591
592         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
593
594         client = talloc_zero(ctdb, struct ctdb_client);
595 #ifdef _AIX
596         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
597 #else
598         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
599 #endif
600                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
601         }
602
603         client->ctdb = ctdb;
604         client->fd = fd;
605         client->client_id = ctdb_reqid_new(ctdb, client);
606         client->pid = cr.pid;
607
608         client_pid = talloc(client, struct ctdb_client_pid_list);
609         if (client_pid == NULL) {
610                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
611                 close(fd);
612                 talloc_free(client);
613                 return;
614         }               
615         client_pid->ctdb   = ctdb;
616         client_pid->pid    = cr.pid;
617         client_pid->client = client;
618
619         DLIST_ADD(ctdb->client_pids, client_pid);
620
621         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
622                                          ctdb_daemon_read_cb, client);
623
624         talloc_set_destructor(client, ctdb_client_destructor);
625         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
626         ctdb->statistics.num_clients++;
627 }
628
629
630
631 /*
632   create a unix domain socket and bind it
633   return a file descriptor open on the socket 
634 */
635 static int ux_socket_bind(struct ctdb_context *ctdb)
636 {
637         struct sockaddr_un addr;
638
639         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
640         if (ctdb->daemon.sd == -1) {
641                 return -1;
642         }
643
644         set_close_on_exec(ctdb->daemon.sd);
645         set_nonblocking(ctdb->daemon.sd);
646
647         memset(&addr, 0, sizeof(addr));
648         addr.sun_family = AF_UNIX;
649         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
650
651         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
652                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
653                 goto failed;
654         }       
655
656         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
657             chmod(ctdb->daemon.name, 0700) != 0) {
658                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
659                 goto failed;
660         } 
661
662
663         if (listen(ctdb->daemon.sd, 100) != 0) {
664                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
665                 goto failed;
666         }
667
668         return 0;
669
670 failed:
671         close(ctdb->daemon.sd);
672         ctdb->daemon.sd = -1;
673         return -1;      
674 }
675
676 static void sig_child_handler(struct event_context *ev,
677         struct signal_event *se, int signum, int count,
678         void *dont_care, 
679         void *private_data)
680 {
681 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
682         int status;
683         pid_t pid = -1;
684
685         while (pid != 0) {
686                 pid = waitpid(-1, &status, WNOHANG);
687                 if (pid == -1) {
688                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
689                         return;
690                 }
691                 if (pid > 0) {
692                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
693                 }
694         }
695 }
696
697 /*
698   start the protocol going as a daemon
699 */
700 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
701 {
702         int res, ret = -1;
703         struct fd_event *fde;
704         const char *domain_socket_name;
705         struct signal_event *se;
706
707         /* get rid of any old sockets */
708         unlink(ctdb->daemon.name);
709
710         /* create a unix domain stream socket to listen to */
711         res = ux_socket_bind(ctdb);
712         if (res!=0) {
713                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
714                 exit(10);
715         }
716
717         if (do_fork && fork()) {
718                 return 0;
719         }
720
721         tdb_reopen_all(False);
722
723         if (do_fork) {
724                 setsid();
725                 close(0);
726                 if (open("/dev/null", O_RDONLY) != 0) {
727                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
728                         exit(11);
729                 }
730         }
731         block_signal(SIGPIPE);
732
733         ctdbd_pid = getpid();
734
735
736         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
737
738         ctdb_high_priority(ctdb);
739
740         /* ensure the socket is deleted on exit of the daemon */
741         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
742         if (domain_socket_name == NULL) {
743                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
744                 exit(12);
745         }
746
747         ctdb->ev = event_context_init(NULL);
748
749         ctdb_set_child_logging(ctdb);
750
751         /* force initial recovery for election */
752         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
753
754         if (strcmp(ctdb->transport, "tcp") == 0) {
755                 int ctdb_tcp_init(struct ctdb_context *);
756                 ret = ctdb_tcp_init(ctdb);
757         }
758 #ifdef USE_INFINIBAND
759         if (strcmp(ctdb->transport, "ib") == 0) {
760                 int ctdb_ibw_init(struct ctdb_context *);
761                 ret = ctdb_ibw_init(ctdb);
762         }
763 #endif
764         if (ret != 0) {
765                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
766                 return -1;
767         }
768
769         if (ctdb->methods == NULL) {
770                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
771                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
772         }
773
774         /* initialise the transport  */
775         if (ctdb->methods->initialise(ctdb) != 0) {
776                 ctdb_fatal(ctdb, "transport failed to initialise");
777         }
778
779         /* attach to existing databases */
780         if (ctdb_attach_databases(ctdb) != 0) {
781                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
782         }
783
784         /* start frozen, then let the first election sort things out */
785         if (ctdb_blocking_freeze(ctdb)) {
786                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
787         }
788
789         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
790         if (ret != 0) {
791                 ctdb_fatal(ctdb, "Failed to run init event\n");
792         }
793         ctdb_run_notification_script(ctdb, "init");
794
795         /* now start accepting clients, only can do this once frozen */
796         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
797                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
798                            ctdb_accept_client, ctdb);
799
800         /* tell all other nodes we've just started up */
801         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
802                                  0, CTDB_CONTROL_STARTUP, 0,
803                                  CTDB_CTRL_FLAG_NOREPLY,
804                                  tdb_null, NULL, NULL);
805
806         /* release any IPs we hold from previous runs of the daemon */
807         ctdb_release_all_ips(ctdb);
808
809         /* start the transport going */
810         ctdb_start_transport(ctdb);
811
812         /* set up a handler to pick up sigchld */
813         se = event_add_signal(ctdb->ev, ctdb,
814                                      SIGCHLD, 0,
815                                      sig_child_handler,
816                                      ctdb);
817         if (se == NULL) {
818                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
819                 exit(1);
820         }
821
822         if (use_syslog) {
823                 if (start_syslog_daemon(ctdb)) {
824                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
825                         exit(10);
826                 }
827         }
828
829         ctdb_lockdown_memory(ctdb);
830           
831         /* go into a wait loop to allow other nodes to complete */
832         event_loop_wait(ctdb->ev);
833
834         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
835         exit(1);
836 }
837
838 /*
839   allocate a packet for use in daemon<->daemon communication
840  */
841 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
842                                                  TALLOC_CTX *mem_ctx, 
843                                                  enum ctdb_operation operation, 
844                                                  size_t length, size_t slength,
845                                                  const char *type)
846 {
847         int size;
848         struct ctdb_req_header *hdr;
849
850         length = MAX(length, slength);
851         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
852
853         if (ctdb->methods == NULL) {
854                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
855                          operation, (unsigned)length));
856                 return NULL;
857         }
858
859         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
860         if (hdr == NULL) {
861                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
862                          operation, (unsigned)length));
863                 return NULL;
864         }
865         talloc_set_name_const(hdr, type);
866         memset(hdr, 0, slength);
867         hdr->length       = length;
868         hdr->operation    = operation;
869         hdr->ctdb_magic   = CTDB_MAGIC;
870         hdr->ctdb_version = CTDB_VERSION;
871         hdr->generation   = ctdb->vnn_map->generation;
872         hdr->srcnode      = ctdb->pnn;
873
874         return hdr;     
875 }
876
877 struct daemon_control_state {
878         struct daemon_control_state *next, *prev;
879         struct ctdb_client *client;
880         struct ctdb_req_control *c;
881         uint32_t reqid;
882         struct ctdb_node *node;
883 };
884
885 /*
886   callback when a control reply comes in
887  */
888 static void daemon_control_callback(struct ctdb_context *ctdb,
889                                     int32_t status, TDB_DATA data, 
890                                     const char *errormsg,
891                                     void *private_data)
892 {
893         struct daemon_control_state *state = talloc_get_type(private_data, 
894                                                              struct daemon_control_state);
895         struct ctdb_client *client = state->client;
896         struct ctdb_reply_control *r;
897         size_t len;
898         int ret;
899
900         /* construct a message to send to the client containing the data */
901         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
902         if (errormsg) {
903                 len += strlen(errormsg);
904         }
905         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
906                                struct ctdb_reply_control);
907         CTDB_NO_MEMORY_VOID(ctdb, r);
908
909         r->hdr.reqid     = state->reqid;
910         r->status        = status;
911         r->datalen       = data.dsize;
912         r->errorlen = 0;
913         memcpy(&r->data[0], data.dptr, data.dsize);
914         if (errormsg) {
915                 r->errorlen = strlen(errormsg);
916                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
917         }
918
919         ret = daemon_queue_send(client, &r->hdr);
920         if (ret != -1) {
921                 talloc_free(state);
922         }
923 }
924
925 /*
926   fail all pending controls to a disconnected node
927  */
928 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
929 {
930         struct daemon_control_state *state;
931         while ((state = node->pending_controls)) {
932                 DLIST_REMOVE(node->pending_controls, state);
933                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
934                                         "node is disconnected", state);
935         }
936 }
937
938 /*
939   destroy a daemon_control_state
940  */
941 static int daemon_control_destructor(struct daemon_control_state *state)
942 {
943         if (state->node) {
944                 DLIST_REMOVE(state->node->pending_controls, state);
945         }
946         return 0;
947 }
948
949 /*
950   this is called when the ctdb daemon received a ctdb request control
951   from a local client over the unix domain socket
952  */
953 static void daemon_request_control_from_client(struct ctdb_client *client, 
954                                                struct ctdb_req_control *c)
955 {
956         TDB_DATA data;
957         int res;
958         struct daemon_control_state *state;
959         TALLOC_CTX *tmp_ctx = talloc_new(client);
960
961         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
962                 c->hdr.destnode = client->ctdb->pnn;
963         }
964
965         state = talloc(client, struct daemon_control_state);
966         CTDB_NO_MEMORY_VOID(client->ctdb, state);
967
968         state->client = client;
969         state->c = talloc_steal(state, c);
970         state->reqid = c->hdr.reqid;
971         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
972                 state->node = client->ctdb->nodes[c->hdr.destnode];
973                 DLIST_ADD(state->node->pending_controls, state);
974         } else {
975                 state->node = NULL;
976         }
977
978         talloc_set_destructor(state, daemon_control_destructor);
979
980         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
981                 talloc_steal(tmp_ctx, state);
982         }
983         
984         data.dptr = &c->data[0];
985         data.dsize = c->datalen;
986         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
987                                        c->srvid, c->opcode, client->client_id,
988                                        c->flags,
989                                        data, daemon_control_callback,
990                                        state);
991         if (res != 0) {
992                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
993                          c->hdr.destnode));
994         }
995
996         talloc_free(tmp_ctx);
997 }
998
999 /*
1000   register a call function
1001 */
1002 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1003                          ctdb_fn_t fn, int id)
1004 {
1005         struct ctdb_registered_call *call;
1006         struct ctdb_db_context *ctdb_db;
1007
1008         ctdb_db = find_ctdb_db(ctdb, db_id);
1009         if (ctdb_db == NULL) {
1010                 return -1;
1011         }
1012
1013         call = talloc(ctdb_db, struct ctdb_registered_call);
1014         call->fn = fn;
1015         call->id = id;
1016
1017         DLIST_ADD(ctdb_db->calls, call);        
1018         return 0;
1019 }
1020
1021
1022
1023 /*
1024   this local messaging handler is ugly, but is needed to prevent
1025   recursion in ctdb_send_message() when the destination node is the
1026   same as the source node
1027  */
1028 struct ctdb_local_message {
1029         struct ctdb_context *ctdb;
1030         uint64_t srvid;
1031         TDB_DATA data;
1032 };
1033
1034 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1035                                        struct timeval t, void *private_data)
1036 {
1037         struct ctdb_local_message *m = talloc_get_type(private_data, 
1038                                                        struct ctdb_local_message);
1039         int res;
1040
1041         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1042         if (res != 0) {
1043                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1044                           (unsigned long long)m->srvid));
1045         }
1046         talloc_free(m);
1047 }
1048
1049 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1050 {
1051         struct ctdb_local_message *m;
1052         m = talloc(ctdb, struct ctdb_local_message);
1053         CTDB_NO_MEMORY(ctdb, m);
1054
1055         m->ctdb = ctdb;
1056         m->srvid = srvid;
1057         m->data  = data;
1058         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1059         if (m->data.dptr == NULL) {
1060                 talloc_free(m);
1061                 return -1;
1062         }
1063
1064         /* this needs to be done as an event to prevent recursion */
1065         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1066         return 0;
1067 }
1068
1069 /*
1070   send a ctdb message
1071 */
1072 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1073                              uint64_t srvid, TDB_DATA data)
1074 {
1075         struct ctdb_req_message *r;
1076         int len;
1077
1078         if (ctdb->methods == NULL) {
1079                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1080                 return -1;
1081         }
1082
1083         /* see if this is a message to ourselves */
1084         if (pnn == ctdb->pnn) {
1085                 return ctdb_local_message(ctdb, srvid, data);
1086         }
1087
1088         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1089         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1090                                     struct ctdb_req_message);
1091         CTDB_NO_MEMORY(ctdb, r);
1092
1093         r->hdr.destnode  = pnn;
1094         r->srvid         = srvid;
1095         r->datalen       = data.dsize;
1096         memcpy(&r->data[0], data.dptr, data.dsize);
1097
1098         ctdb_queue_packet(ctdb, &r->hdr);
1099
1100         talloc_free(r);
1101         return 0;
1102 }
1103
1104
1105
1106 struct ctdb_client_notify_list {
1107         struct ctdb_client_notify_list *next, *prev;
1108         struct ctdb_context *ctdb;
1109         uint64_t srvid;
1110         TDB_DATA data;
1111 };
1112
1113
1114 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1115 {
1116         int ret;
1117
1118         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1119
1120         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1121         if (ret != 0) {
1122                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1123         }
1124
1125         return 0;
1126 }
1127
1128 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1129 {
1130         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1131         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1132         struct ctdb_client_notify_list *nl;
1133
1134         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1135
1136         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1137                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1138                 return -1;
1139         }
1140
1141         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1142                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1143                 return -1;
1144         }
1145
1146
1147         if (client == NULL) {
1148                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1149                 return -1;
1150         }
1151
1152         for(nl=client->notify; nl; nl=nl->next) {
1153                 if (nl->srvid == notify->srvid) {
1154                         break;
1155                 }
1156         }
1157         if (nl != NULL) {
1158                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1159                 return -1;
1160         }
1161
1162         nl = talloc(client, struct ctdb_client_notify_list);
1163         CTDB_NO_MEMORY(ctdb, nl);
1164         nl->ctdb       = ctdb;
1165         nl->srvid      = notify->srvid;
1166         nl->data.dsize = notify->len;
1167         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1168         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1169         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1170         
1171         DLIST_ADD(client->notify, nl);
1172         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1173
1174         return 0;
1175 }
1176
1177 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1178 {
1179         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1180         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1181         struct ctdb_client_notify_list *nl;
1182
1183         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1184
1185         if (client == NULL) {
1186                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1187                 return -1;
1188         }
1189
1190         for(nl=client->notify; nl; nl=nl->next) {
1191                 if (nl->srvid == notify->srvid) {
1192                         break;
1193                 }
1194         }
1195         if (nl == NULL) {
1196                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1197                 return -1;
1198         }
1199
1200         DLIST_REMOVE(client->notify, nl);
1201         talloc_set_destructor(nl, NULL);
1202         talloc_free(nl);
1203
1204         return 0;
1205 }
1206
1207 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1208 {
1209         struct ctdb_client_pid_list *client_pid;
1210
1211         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1212                 if (client_pid->pid == pid) {
1213                         return client_pid->client;
1214                 }
1215         }
1216         return NULL;
1217 }
1218
1219
1220 /* This control is used by samba when probing if a process (of a samba daemon)
1221    exists on the node.
1222    Samba does this when it needs/wants to check if a subrecord in one of the
1223    databases is still valied, or if it is stale and can be removed.
1224    If the node is in unhealthy or stopped state we just kill of the samba
1225    process holding htis sub-record and return to the calling samba that
1226    the process does not exist.
1227    This allows us to forcefully recall subrecords registered by samba processes
1228    on banned and stopped nodes.
1229 */
1230 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1231 {
1232         struct ctdb_client *client;
1233
1234         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1235                 client = ctdb_find_client_by_pid(ctdb, pid);
1236                 if (client != NULL) {
1237                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1238                         talloc_free(client);
1239                 }
1240                 return -1;
1241         }
1242
1243         return kill(pid, 0);
1244 }