Revert "Make fetch_locked more scalable"
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         client->ctdb->statistics.client_packets_sent++;
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("Drop CTDB_REQ_MESSAGE to client. Queue full.\n"));
104                         return 0;
105                 }
106         }
107         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
108 }
109
110 /*
111   message handler for when we are in daemon mode. This redirects the message
112   to the right client
113  */
114 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
115                                     TDB_DATA data, void *private_data)
116 {
117         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
118         struct ctdb_req_message *r;
119         int len;
120
121         /* construct a message to send to the client containing the data */
122         len = offsetof(struct ctdb_req_message, data) + data.dsize;
123         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
124                                len, struct ctdb_req_message);
125         CTDB_NO_MEMORY_VOID(ctdb, r);
126
127         talloc_set_name_const(r, "req_message packet");
128
129         r->srvid         = srvid;
130         r->datalen       = data.dsize;
131         memcpy(&r->data[0], data.dptr, data.dsize);
132
133         daemon_queue_send(client, &r->hdr);
134
135         talloc_free(r);
136 }
137
138 /*
139   this is called when the ctdb daemon received a ctdb request to 
140   set the srvid from the client
141  */
142 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
143 {
144         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
145         int res;
146         if (client == NULL) {
147                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
148                 return -1;
149         }
150         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
151         if (res != 0) {
152                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
153                          (unsigned long long)srvid));
154         } else {
155                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
156                          (unsigned long long)srvid));
157         }
158
159         return res;
160 }
161
162 /*
163   this is called when the ctdb daemon received a ctdb request to 
164   remove a srvid from the client
165  */
166 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
167 {
168         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
169         if (client == NULL) {
170                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
171                 return -1;
172         }
173         return ctdb_deregister_message_handler(ctdb, srvid, client);
174 }
175
176
177 /*
178   destroy a ctdb_client
179 */
180 static int ctdb_client_destructor(struct ctdb_client *client)
181 {
182         struct ctdb_db_context *ctdb_db;
183
184         ctdb_takeover_client_destructor_hook(client);
185         ctdb_reqid_remove(client->ctdb, client->client_id);
186         if (client->ctdb->statistics.num_clients) {
187                 client->ctdb->statistics.num_clients--;
188         }
189
190         if (client->num_persistent_updates != 0) {
191                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
192                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
193         }
194         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
195         if (ctdb_db) {
196                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
197                                   "commit active. Forcing recovery.\n"));
198                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
199                 ctdb_db->transaction_active = false;
200         }
201
202         return 0;
203 }
204
205
206 /*
207   this is called when the ctdb daemon received a ctdb request message
208   from a local client over the unix domain socket
209  */
210 static void daemon_request_message_from_client(struct ctdb_client *client, 
211                                                struct ctdb_req_message *c)
212 {
213         TDB_DATA data;
214         int res;
215
216         /* maybe the message is for another client on this node */
217         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
218                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
219                 return;
220         }
221
222         /* its for a remote node */
223         data.dptr = &c->data[0];
224         data.dsize = c->datalen;
225         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
226                                        c->srvid, data);
227         if (res != 0) {
228                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
229                          c->hdr.destnode));
230         }
231 }
232
233
234 struct daemon_call_state {
235         struct ctdb_client *client;
236         uint32_t reqid;
237         struct ctdb_call *call;
238         struct timeval start_time;
239 };
240
241 /* 
242    complete a call from a client 
243 */
244 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
245 {
246         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
247                                                            struct daemon_call_state);
248         struct ctdb_reply_call *r;
249         int res;
250         uint32_t length;
251         struct ctdb_client *client = dstate->client;
252         struct ctdb_db_context *ctdb_db = state->ctdb_db;
253
254         talloc_steal(client, dstate);
255         talloc_steal(dstate, dstate->call);
256
257         res = ctdb_daemon_call_recv(state, dstate->call);
258         if (res != 0) {
259                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
260                 if (client->ctdb->statistics.pending_calls > 0) {
261                         client->ctdb->statistics.pending_calls--;
262                 }
263                 ctdb_latency(ctdb_db, "call_from_client_cb 1", &client->ctdb->statistics.max_call_latency, dstate->start_time);
264                 return;
265         }
266
267         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
268         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
269                                length, struct ctdb_reply_call);
270         if (r == NULL) {
271                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
272                 if (client->ctdb->statistics.pending_calls > 0) {
273                         client->ctdb->statistics.pending_calls--;
274                 }
275                 ctdb_latency(ctdb_db, "call_from_client_cb 2", &client->ctdb->statistics.max_call_latency, dstate->start_time);
276                 return;
277         }
278         r->hdr.reqid        = dstate->reqid;
279         r->datalen          = dstate->call->reply_data.dsize;
280         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
281
282         res = daemon_queue_send(client, &r->hdr);
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         ctdb_latency(ctdb_db, "call_from_client_cb 3", &client->ctdb->statistics.max_call_latency, dstate->start_time);
287         talloc_free(dstate);
288         if (client->ctdb->statistics.pending_calls > 0) {
289                 client->ctdb->statistics.pending_calls--;
290         }
291 }
292
293 struct ctdb_daemon_packet_wrap {
294         struct ctdb_context *ctdb;
295         uint32_t client_id;
296 };
297
298 /*
299   a wrapper to catch disconnected clients
300  */
301 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
302 {
303         struct ctdb_client *client;
304         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
305                                                             struct ctdb_daemon_packet_wrap);
306         if (w == NULL) {
307                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
308                 return;
309         }
310
311         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
312         if (client == NULL) {
313                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
314                          w->client_id));
315                 talloc_free(w);
316                 return;
317         }
318         talloc_free(w);
319
320         /* process it */
321         daemon_incoming_packet(client, hdr);    
322 }
323
324
325 /*
326   this is called when the ctdb daemon received a ctdb request call
327   from a local client over the unix domain socket
328  */
329 static void daemon_request_call_from_client(struct ctdb_client *client, 
330                                             struct ctdb_req_call *c)
331 {
332         struct ctdb_call_state *state;
333         struct ctdb_db_context *ctdb_db;
334         struct daemon_call_state *dstate;
335         struct ctdb_call *call;
336         struct ctdb_ltdb_header header;
337         TDB_DATA key, data;
338         int ret;
339         struct ctdb_context *ctdb = client->ctdb;
340         struct ctdb_daemon_packet_wrap *w;
341
342         ctdb->statistics.total_calls++;
343         if (client->ctdb->statistics.pending_calls > 0) {
344                 ctdb->statistics.pending_calls++;
345         }
346
347         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
348         if (!ctdb_db) {
349                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
350                           c->db_id));
351                 if (client->ctdb->statistics.pending_calls > 0) {
352                         ctdb->statistics.pending_calls--;
353                 }
354                 return;
355         }
356
357         key.dptr = c->data;
358         key.dsize = c->keylen;
359
360         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
361         CTDB_NO_MEMORY_VOID(ctdb, w);   
362
363         w->ctdb = ctdb;
364         w->client_id = client->client_id;
365
366         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
367                                            (struct ctdb_req_header *)c, &data,
368                                            daemon_incoming_packet_wrap, w, True);
369         if (ret == -2) {
370                 /* will retry later */
371                 if (client->ctdb->statistics.pending_calls > 0) {
372                         ctdb->statistics.pending_calls--;
373                 }
374                 return;
375         }
376
377         talloc_free(w);
378
379         if (ret != 0) {
380                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
381                 if (client->ctdb->statistics.pending_calls > 0) {
382                         ctdb->statistics.pending_calls--;
383                 }
384                 return;
385         }
386
387         dstate = talloc(client, struct daemon_call_state);
388         if (dstate == NULL) {
389                 ctdb_ltdb_unlock(ctdb_db, key);
390                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
391                 if (client->ctdb->statistics.pending_calls > 0) {
392                         ctdb->statistics.pending_calls--;
393                 }
394                 return;
395         }
396         dstate->start_time = timeval_current();
397         dstate->client = client;
398         dstate->reqid  = c->hdr.reqid;
399         talloc_steal(dstate, data.dptr);
400
401         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
402         if (call == NULL) {
403                 ctdb_ltdb_unlock(ctdb_db, key);
404                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
405                 if (client->ctdb->statistics.pending_calls > 0) {
406                         ctdb->statistics.pending_calls--;
407                 }
408                 ctdb_latency(ctdb_db, "call_from_client 1", &ctdb->statistics.max_call_latency, dstate->start_time);
409                 return;
410         }
411
412         call->call_id = c->callid;
413         call->key = key;
414         call->call_data.dptr = c->data + c->keylen;
415         call->call_data.dsize = c->calldatalen;
416         call->flags = c->flags;
417
418         if (header.dmaster == ctdb->pnn) {
419                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
420         } else {
421                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
422         }
423
424         ctdb_ltdb_unlock(ctdb_db, key);
425
426         if (state == NULL) {
427                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
428                 if (client->ctdb->statistics.pending_calls > 0) {
429                         ctdb->statistics.pending_calls--;
430                 }
431                 ctdb_latency(ctdb_db, "call_from_client 2", &ctdb->statistics.max_call_latency, dstate->start_time);
432                 return;
433         }
434         talloc_steal(state, dstate);
435         talloc_steal(client, state);
436
437         state->async.fn = daemon_call_from_client_callback;
438         state->async.private_data = dstate;
439 }
440
441
442 static void daemon_request_control_from_client(struct ctdb_client *client, 
443                                                struct ctdb_req_control *c);
444
445 /* data contains a packet from the client */
446 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
447 {
448         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
449         TALLOC_CTX *tmp_ctx;
450         struct ctdb_context *ctdb = client->ctdb;
451
452         /* place the packet as a child of a tmp_ctx. We then use
453            talloc_free() below to free it. If any of the calls want
454            to keep it, then they will steal it somewhere else, and the
455            talloc_free() will be a no-op */
456         tmp_ctx = talloc_new(client);
457         talloc_steal(tmp_ctx, hdr);
458
459         if (hdr->ctdb_magic != CTDB_MAGIC) {
460                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
461                 goto done;
462         }
463
464         if (hdr->ctdb_version != CTDB_VERSION) {
465                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
466                 goto done;
467         }
468
469         switch (hdr->operation) {
470         case CTDB_REQ_CALL:
471                 ctdb->statistics.client.req_call++;
472                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
473                 break;
474
475         case CTDB_REQ_MESSAGE:
476                 ctdb->statistics.client.req_message++;
477                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
478                 break;
479
480         case CTDB_REQ_CONTROL:
481                 ctdb->statistics.client.req_control++;
482                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
483                 break;
484
485         default:
486                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
487                          hdr->operation));
488         }
489
490 done:
491         talloc_free(tmp_ctx);
492 }
493
494 /*
495   called when the daemon gets a incoming packet
496  */
497 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
498 {
499         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
500         struct ctdb_req_header *hdr;
501
502         if (cnt == 0) {
503                 talloc_free(client);
504                 return;
505         }
506
507         client->ctdb->statistics.client_packets_recv++;
508
509         if (cnt < sizeof(*hdr)) {
510                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
511                                (unsigned)cnt);
512                 return;
513         }
514         hdr = (struct ctdb_req_header *)data;
515         if (cnt != hdr->length) {
516                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
517                                (unsigned)hdr->length, (unsigned)cnt);
518                 return;
519         }
520
521         if (hdr->ctdb_magic != CTDB_MAGIC) {
522                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
523                 return;
524         }
525
526         if (hdr->ctdb_version != CTDB_VERSION) {
527                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
528                 return;
529         }
530
531         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
532                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
533                  hdr->srcnode, hdr->destnode));
534
535         /* it is the responsibility of the incoming packet function to free 'data' */
536         daemon_incoming_packet(client, hdr);
537 }
538
539
540 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
541 {
542         if (client_pid->ctdb->client_pids != NULL) {
543                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
544         }
545
546         return 0;
547 }
548
549
550 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
551                          uint16_t flags, void *private_data)
552 {
553         struct sockaddr_un addr;
554         socklen_t len;
555         int fd;
556         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
557         struct ctdb_client *client;
558         struct ctdb_client_pid_list *client_pid;
559 #ifdef _AIX
560         struct peercred_struct cr;
561         socklen_t crl = sizeof(struct peercred_struct);
562 #else
563         struct ucred cr;
564         socklen_t crl = sizeof(struct ucred);
565 #endif
566
567         memset(&addr, 0, sizeof(addr));
568         len = sizeof(addr);
569         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
570         if (fd == -1) {
571                 return;
572         }
573
574         set_nonblocking(fd);
575         set_close_on_exec(fd);
576
577         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
578
579         client = talloc_zero(ctdb, struct ctdb_client);
580 #ifdef _AIX
581         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
582 #else
583         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
584 #endif
585                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
586         }
587
588         client->ctdb = ctdb;
589         client->fd = fd;
590         client->client_id = ctdb_reqid_new(ctdb, client);
591         client->pid = cr.pid;
592
593         client_pid = talloc(client, struct ctdb_client_pid_list);
594         if (client_pid == NULL) {
595                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
596                 close(fd);
597                 talloc_free(client);
598                 return;
599         }               
600         client_pid->ctdb   = ctdb;
601         client_pid->pid    = cr.pid;
602         client_pid->client = client;
603
604         DLIST_ADD(ctdb->client_pids, client_pid);
605
606         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
607                                          ctdb_daemon_read_cb, client);
608
609         talloc_set_destructor(client, ctdb_client_destructor);
610         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
611         ctdb->statistics.num_clients++;
612 }
613
614
615
616 /*
617   create a unix domain socket and bind it
618   return a file descriptor open on the socket 
619 */
620 static int ux_socket_bind(struct ctdb_context *ctdb)
621 {
622         struct sockaddr_un addr;
623
624         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
625         if (ctdb->daemon.sd == -1) {
626                 return -1;
627         }
628
629         set_close_on_exec(ctdb->daemon.sd);
630         set_nonblocking(ctdb->daemon.sd);
631
632         memset(&addr, 0, sizeof(addr));
633         addr.sun_family = AF_UNIX;
634         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
635
636         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
637                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
638                 goto failed;
639         }       
640
641         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
642             chmod(ctdb->daemon.name, 0700) != 0) {
643                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
644                 goto failed;
645         } 
646
647
648         if (listen(ctdb->daemon.sd, 100) != 0) {
649                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
650                 goto failed;
651         }
652
653         return 0;
654
655 failed:
656         close(ctdb->daemon.sd);
657         ctdb->daemon.sd = -1;
658         return -1;      
659 }
660
661 static void sig_child_handler(struct event_context *ev,
662         struct signal_event *se, int signum, int count,
663         void *dont_care, 
664         void *private_data)
665 {
666 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
667         int status;
668         pid_t pid = -1;
669
670         while (pid != 0) {
671                 pid = waitpid(-1, &status, WNOHANG);
672                 if (pid == -1) {
673                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
674                         return;
675                 }
676                 if (pid > 0) {
677                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
678                 }
679         }
680 }
681
682 /*
683   start the protocol going as a daemon
684 */
685 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
686 {
687         int res, ret = -1;
688         struct fd_event *fde;
689         const char *domain_socket_name;
690         struct signal_event *se;
691
692         /* get rid of any old sockets */
693         unlink(ctdb->daemon.name);
694
695         /* create a unix domain stream socket to listen to */
696         res = ux_socket_bind(ctdb);
697         if (res!=0) {
698                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
699                 exit(10);
700         }
701
702         if (do_fork && fork()) {
703                 return 0;
704         }
705
706         tdb_reopen_all(False);
707
708         if (do_fork) {
709                 setsid();
710                 close(0);
711                 if (open("/dev/null", O_RDONLY) != 0) {
712                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
713                         exit(11);
714                 }
715         }
716         block_signal(SIGPIPE);
717
718         ctdbd_pid = getpid();
719
720
721         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
722
723         if (ctdb->do_setsched) {
724                 /* try to set us up as realtime */
725                 ctdb_set_scheduler(ctdb);
726         }
727
728         /* ensure the socket is deleted on exit of the daemon */
729         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
730         if (domain_socket_name == NULL) {
731                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
732                 exit(12);
733         }
734
735         ctdb->ev = event_context_init(NULL);
736
737         ctdb_set_child_logging(ctdb);
738
739         /* force initial recovery for election */
740         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
741
742         if (strcmp(ctdb->transport, "tcp") == 0) {
743                 int ctdb_tcp_init(struct ctdb_context *);
744                 ret = ctdb_tcp_init(ctdb);
745         }
746 #ifdef USE_INFINIBAND
747         if (strcmp(ctdb->transport, "ib") == 0) {
748                 int ctdb_ibw_init(struct ctdb_context *);
749                 ret = ctdb_ibw_init(ctdb);
750         }
751 #endif
752         if (ret != 0) {
753                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
754                 return -1;
755         }
756
757         if (ctdb->methods == NULL) {
758                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
759                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
760         }
761
762         /* initialise the transport  */
763         if (ctdb->methods->initialise(ctdb) != 0) {
764                 ctdb_fatal(ctdb, "transport failed to initialise");
765         }
766
767         /* attach to any existing persistent databases */
768         if (ctdb_attach_persistent(ctdb) != 0) {
769                 ctdb_fatal(ctdb, "Failed to attach to persistent databases\n");         
770         }
771
772         /* start frozen, then let the first election sort things out */
773         if (ctdb_blocking_freeze(ctdb)) {
774                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
775         }
776
777         /* now start accepting clients, only can do this once frozen */
778         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
779                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
780                            ctdb_accept_client, ctdb);
781
782         /* tell all other nodes we've just started up */
783         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
784                                  0, CTDB_CONTROL_STARTUP, 0,
785                                  CTDB_CTRL_FLAG_NOREPLY,
786                                  tdb_null, NULL, NULL);
787
788         /* release any IPs we hold from previous runs of the daemon */
789         ctdb_release_all_ips(ctdb);
790
791         /* start the transport going */
792         ctdb_start_transport(ctdb);
793
794         /* set up a handler to pick up sigchld */
795         se = event_add_signal(ctdb->ev, ctdb,
796                                      SIGCHLD, 0,
797                                      sig_child_handler,
798                                      ctdb);
799         if (se == NULL) {
800                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
801                 exit(1);
802         }
803
804         if (use_syslog) {
805                 if (start_syslog_daemon(ctdb)) {
806                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
807                         exit(10);
808                 }
809         }
810
811           
812         /* go into a wait loop to allow other nodes to complete */
813         event_loop_wait(ctdb->ev);
814
815         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
816         exit(1);
817 }
818
819 /*
820   allocate a packet for use in daemon<->daemon communication
821  */
822 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
823                                                  TALLOC_CTX *mem_ctx, 
824                                                  enum ctdb_operation operation, 
825                                                  size_t length, size_t slength,
826                                                  const char *type)
827 {
828         int size;
829         struct ctdb_req_header *hdr;
830
831         length = MAX(length, slength);
832         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
833
834         if (ctdb->methods == NULL) {
835                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
836                          operation, (unsigned)length));
837                 return NULL;
838         }
839
840         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
841         if (hdr == NULL) {
842                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
843                          operation, (unsigned)length));
844                 return NULL;
845         }
846         talloc_set_name_const(hdr, type);
847         memset(hdr, 0, slength);
848         hdr->length       = length;
849         hdr->operation    = operation;
850         hdr->ctdb_magic   = CTDB_MAGIC;
851         hdr->ctdb_version = CTDB_VERSION;
852         hdr->generation   = ctdb->vnn_map->generation;
853         hdr->srcnode      = ctdb->pnn;
854
855         return hdr;     
856 }
857
858 struct daemon_control_state {
859         struct daemon_control_state *next, *prev;
860         struct ctdb_client *client;
861         struct ctdb_req_control *c;
862         uint32_t reqid;
863         struct ctdb_node *node;
864 };
865
866 /*
867   callback when a control reply comes in
868  */
869 static void daemon_control_callback(struct ctdb_context *ctdb,
870                                     int32_t status, TDB_DATA data, 
871                                     const char *errormsg,
872                                     void *private_data)
873 {
874         struct daemon_control_state *state = talloc_get_type(private_data, 
875                                                              struct daemon_control_state);
876         struct ctdb_client *client = state->client;
877         struct ctdb_reply_control *r;
878         size_t len;
879
880         /* construct a message to send to the client containing the data */
881         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
882         if (errormsg) {
883                 len += strlen(errormsg);
884         }
885         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
886                                struct ctdb_reply_control);
887         CTDB_NO_MEMORY_VOID(ctdb, r);
888
889         r->hdr.reqid     = state->reqid;
890         r->status        = status;
891         r->datalen       = data.dsize;
892         r->errorlen = 0;
893         memcpy(&r->data[0], data.dptr, data.dsize);
894         if (errormsg) {
895                 r->errorlen = strlen(errormsg);
896                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
897         }
898
899         daemon_queue_send(client, &r->hdr);
900
901         talloc_free(state);
902 }
903
904 /*
905   fail all pending controls to a disconnected node
906  */
907 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
908 {
909         struct daemon_control_state *state;
910         while ((state = node->pending_controls)) {
911                 DLIST_REMOVE(node->pending_controls, state);
912                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
913                                         "node is disconnected", state);
914         }
915 }
916
917 /*
918   destroy a daemon_control_state
919  */
920 static int daemon_control_destructor(struct daemon_control_state *state)
921 {
922         if (state->node) {
923                 DLIST_REMOVE(state->node->pending_controls, state);
924         }
925         return 0;
926 }
927
928 /*
929   this is called when the ctdb daemon received a ctdb request control
930   from a local client over the unix domain socket
931  */
932 static void daemon_request_control_from_client(struct ctdb_client *client, 
933                                                struct ctdb_req_control *c)
934 {
935         TDB_DATA data;
936         int res;
937         struct daemon_control_state *state;
938         TALLOC_CTX *tmp_ctx = talloc_new(client);
939
940         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
941                 c->hdr.destnode = client->ctdb->pnn;
942         }
943
944         state = talloc(client, struct daemon_control_state);
945         CTDB_NO_MEMORY_VOID(client->ctdb, state);
946
947         state->client = client;
948         state->c = talloc_steal(state, c);
949         state->reqid = c->hdr.reqid;
950         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
951                 state->node = client->ctdb->nodes[c->hdr.destnode];
952                 DLIST_ADD(state->node->pending_controls, state);
953         } else {
954                 state->node = NULL;
955         }
956
957         talloc_set_destructor(state, daemon_control_destructor);
958
959         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
960                 talloc_steal(tmp_ctx, state);
961         }
962         
963         data.dptr = &c->data[0];
964         data.dsize = c->datalen;
965         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
966                                        c->srvid, c->opcode, client->client_id,
967                                        c->flags,
968                                        data, daemon_control_callback,
969                                        state);
970         if (res != 0) {
971                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
972                          c->hdr.destnode));
973         }
974
975         talloc_free(tmp_ctx);
976 }
977
978 /*
979   register a call function
980 */
981 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
982                          ctdb_fn_t fn, int id)
983 {
984         struct ctdb_registered_call *call;
985         struct ctdb_db_context *ctdb_db;
986
987         ctdb_db = find_ctdb_db(ctdb, db_id);
988         if (ctdb_db == NULL) {
989                 return -1;
990         }
991
992         call = talloc(ctdb_db, struct ctdb_registered_call);
993         call->fn = fn;
994         call->id = id;
995
996         DLIST_ADD(ctdb_db->calls, call);        
997         return 0;
998 }
999
1000
1001
1002 /*
1003   this local messaging handler is ugly, but is needed to prevent
1004   recursion in ctdb_send_message() when the destination node is the
1005   same as the source node
1006  */
1007 struct ctdb_local_message {
1008         struct ctdb_context *ctdb;
1009         uint64_t srvid;
1010         TDB_DATA data;
1011 };
1012
1013 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1014                                        struct timeval t, void *private_data)
1015 {
1016         struct ctdb_local_message *m = talloc_get_type(private_data, 
1017                                                        struct ctdb_local_message);
1018         int res;
1019
1020         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1021         if (res != 0) {
1022                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1023                           (unsigned long long)m->srvid));
1024         }
1025         talloc_free(m);
1026 }
1027
1028 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1029 {
1030         struct ctdb_local_message *m;
1031         m = talloc(ctdb, struct ctdb_local_message);
1032         CTDB_NO_MEMORY(ctdb, m);
1033
1034         m->ctdb = ctdb;
1035         m->srvid = srvid;
1036         m->data  = data;
1037         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1038         if (m->data.dptr == NULL) {
1039                 talloc_free(m);
1040                 return -1;
1041         }
1042
1043         /* this needs to be done as an event to prevent recursion */
1044         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1045         return 0;
1046 }
1047
1048 /*
1049   send a ctdb message
1050 */
1051 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1052                              uint64_t srvid, TDB_DATA data)
1053 {
1054         struct ctdb_req_message *r;
1055         int len;
1056
1057         if (ctdb->methods == NULL) {
1058                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message. Transport is DOWN\n"));
1059                 return -1;
1060         }
1061
1062         /* see if this is a message to ourselves */
1063         if (pnn == ctdb->pnn) {
1064                 return ctdb_local_message(ctdb, srvid, data);
1065         }
1066
1067         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1068         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1069                                     struct ctdb_req_message);
1070         CTDB_NO_MEMORY(ctdb, r);
1071
1072         r->hdr.destnode  = pnn;
1073         r->srvid         = srvid;
1074         r->datalen       = data.dsize;
1075         memcpy(&r->data[0], data.dptr, data.dsize);
1076
1077         ctdb_queue_packet(ctdb, &r->hdr);
1078
1079         talloc_free(r);
1080         return 0;
1081 }
1082
1083
1084
1085 struct ctdb_client_notify_list {
1086         struct ctdb_client_notify_list *next, *prev;
1087         struct ctdb_context *ctdb;
1088         uint64_t srvid;
1089         TDB_DATA data;
1090 };
1091
1092
1093 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1094 {
1095         int ret;
1096
1097         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1098
1099         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1100         if (ret != 0) {
1101                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1102         }
1103
1104         return 0;
1105 }
1106
1107 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1108 {
1109         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1110         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1111         struct ctdb_client_notify_list *nl;
1112
1113         DEBUG(DEBUG_ERR,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1114
1115         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1116                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1117                 return -1;
1118         }
1119
1120         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1121                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1122                 return -1;
1123         }
1124
1125
1126         if (client == NULL) {
1127                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1128                 return -1;
1129         }
1130
1131         for(nl=client->notify; nl; nl=nl->next) {
1132                 if (nl->srvid == notify->srvid) {
1133                         break;
1134                 }
1135         }
1136         if (nl != NULL) {
1137                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1138                 return -1;
1139         }
1140
1141         nl = talloc(client, struct ctdb_client_notify_list);
1142         CTDB_NO_MEMORY(ctdb, nl);
1143         nl->ctdb       = ctdb;
1144         nl->srvid      = notify->srvid;
1145         nl->data.dsize = notify->len;
1146         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1147         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1148         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1149         
1150         DLIST_ADD(client->notify, nl);
1151         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1152
1153         return 0;
1154 }
1155
1156 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1157 {
1158         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1159         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1160         struct ctdb_client_notify_list *nl;
1161
1162         DEBUG(DEBUG_ERR,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1163
1164         if (client == NULL) {
1165                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1166                 return -1;
1167         }
1168
1169         for(nl=client->notify; nl; nl=nl->next) {
1170                 if (nl->srvid == notify->srvid) {
1171                         break;
1172                 }
1173         }
1174         if (nl == NULL) {
1175                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1176                 return -1;
1177         }
1178
1179         DLIST_REMOVE(client->notify, nl);
1180         talloc_set_destructor(nl, NULL);
1181         talloc_free(nl);
1182
1183         return 0;
1184 }
1185
1186 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1187 {
1188         struct ctdb_client_pid_list *client_pid;
1189
1190         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1191                 if (client_pid->pid == pid) {
1192                         return client_pid->client;
1193                 }
1194         }
1195         return NULL;
1196 }
1197
1198
1199 /* This control is used by samba when probing if a process (of a samba daemon)
1200    exists on the node.
1201    Samba does this when it needs/wants to check if a subrecord in one of the
1202    databases is still valied, or if it is stale and can be removed.
1203    If the node is in unhealthy or stopped state we just kill of the samba
1204    process holding htis sub-record and return to the calling samba that
1205    the process does not exist.
1206    This allows us to forcefully recall subrecords registered by samba processes
1207    on banned and stopped nodes.
1208 */
1209 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1210 {
1211         struct ctdb_client *client;
1212
1213         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1214                 client = ctdb_find_client_by_pid(ctdb, pid);
1215                 if (client != NULL) {
1216                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1217                         talloc_free(client);
1218                 }
1219                 return -1;
1220         }
1221
1222         return kill(pid, 0);
1223 }