add a new ctdb_ltdb function to delete a record in a normal database
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46 /* called when the "startup" event script has finished */
47 static void ctdb_start_transport(struct ctdb_context *ctdb)
48 {
49         if (ctdb->methods == NULL) {
50                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
51                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
52         }
53
54         /* start the transport running */
55         if (ctdb->methods->start(ctdb) != 0) {
56                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
57                 ctdb_fatal(ctdb, "transport failed to start");
58         }
59
60         /* start the recovery daemon process */
61         if (ctdb_start_recoverd(ctdb) != 0) {
62                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
63                 exit(11);
64         }
65
66         /* Make sure we log something when the daemon terminates */
67         atexit(print_exit_message);
68
69         /* start monitoring for connected/disconnected nodes */
70         ctdb_start_keepalive(ctdb);
71
72         /* start monitoring for node health */
73         ctdb_start_monitoring(ctdb);
74
75         /* start periodic update of tcp tickle lists */
76         ctdb_start_tcp_tickle_update(ctdb);
77
78         /* start listening for recovery daemon pings */
79         ctdb_control_recd_ping(ctdb);
80 }
81
82 static void block_signal(int signum)
83 {
84         struct sigaction act;
85
86         memset(&act, 0, sizeof(act));
87
88         act.sa_handler = SIG_IGN;
89         sigemptyset(&act.sa_mask);
90         sigaddset(&act.sa_mask, signum);
91         sigaction(signum, &act, NULL);
92 }
93
94
95 /*
96   send a packet to a client
97  */
98 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
99 {
100         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
101         if (hdr->operation == CTDB_REQ_MESSAGE) {
102                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
103                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
104                         talloc_free(client);
105                         return -1;
106                 }
107         }
108         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
109 }
110
111 /*
112   message handler for when we are in daemon mode. This redirects the message
113   to the right client
114  */
115 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
116                                     TDB_DATA data, void *private_data)
117 {
118         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
119         struct ctdb_req_message *r;
120         int len;
121
122         /* construct a message to send to the client containing the data */
123         len = offsetof(struct ctdb_req_message, data) + data.dsize;
124         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
125                                len, struct ctdb_req_message);
126         CTDB_NO_MEMORY_VOID(ctdb, r);
127
128         talloc_set_name_const(r, "req_message packet");
129
130         r->srvid         = srvid;
131         r->datalen       = data.dsize;
132         memcpy(&r->data[0], data.dptr, data.dsize);
133
134         daemon_queue_send(client, &r->hdr);
135
136         talloc_free(r);
137 }
138
139 /*
140   this is called when the ctdb daemon received a ctdb request to 
141   set the srvid from the client
142  */
143 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
144 {
145         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
146         int res;
147         if (client == NULL) {
148                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
149                 return -1;
150         }
151         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
152         if (res != 0) {
153                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
154                          (unsigned long long)srvid));
155         } else {
156                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
157                          (unsigned long long)srvid));
158         }
159
160         return res;
161 }
162
163 /*
164   this is called when the ctdb daemon received a ctdb request to 
165   remove a srvid from the client
166  */
167 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
168 {
169         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
170         if (client == NULL) {
171                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
172                 return -1;
173         }
174         return ctdb_deregister_message_handler(ctdb, srvid, client);
175 }
176
177
178 /*
179   destroy a ctdb_client
180 */
181 static int ctdb_client_destructor(struct ctdb_client *client)
182 {
183         struct ctdb_db_context *ctdb_db;
184
185         ctdb_takeover_client_destructor_hook(client);
186         ctdb_reqid_remove(client->ctdb, client->client_id);
187         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
188
189         if (client->num_persistent_updates != 0) {
190                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
191                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
192         }
193         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
194         if (ctdb_db) {
195                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
196                                   "commit active. Forcing recovery.\n"));
197                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
198                 ctdb_db->transaction_active = false;
199         }
200
201         return 0;
202 }
203
204
205 /*
206   this is called when the ctdb daemon received a ctdb request message
207   from a local client over the unix domain socket
208  */
209 static void daemon_request_message_from_client(struct ctdb_client *client, 
210                                                struct ctdb_req_message *c)
211 {
212         TDB_DATA data;
213         int res;
214
215         /* maybe the message is for another client on this node */
216         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
217                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
218                 return;
219         }
220
221         /* its for a remote node */
222         data.dptr = &c->data[0];
223         data.dsize = c->datalen;
224         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
225                                        c->srvid, data);
226         if (res != 0) {
227                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
228                          c->hdr.destnode));
229         }
230 }
231
232
233 struct daemon_call_state {
234         struct ctdb_client *client;
235         uint32_t reqid;
236         struct ctdb_call *call;
237         struct timeval start_time;
238 };
239
240 /* 
241    complete a call from a client 
242 */
243 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
244 {
245         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
246                                                            struct daemon_call_state);
247         struct ctdb_reply_call *r;
248         int res;
249         uint32_t length;
250         struct ctdb_client *client = dstate->client;
251         struct ctdb_db_context *ctdb_db = state->ctdb_db;
252
253         talloc_steal(client, dstate);
254         talloc_steal(dstate, dstate->call);
255
256         res = ctdb_daemon_call_recv(state, dstate->call);
257         if (res != 0) {
258                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
259                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
260
261                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
262                 return;
263         }
264
265         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
266         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
267                                length, struct ctdb_reply_call);
268         if (r == NULL) {
269                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
270                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
271                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
272                 return;
273         }
274         r->hdr.reqid        = dstate->reqid;
275         r->datalen          = dstate->call->reply_data.dsize;
276         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
277
278         res = daemon_queue_send(client, &r->hdr);
279         if (res == -1) {
280                 /* client is dead - return immediately */
281                 return;
282         }
283         if (res != 0) {
284                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
285         }
286         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
287         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
288         talloc_free(dstate);
289 }
290
291 struct ctdb_daemon_packet_wrap {
292         struct ctdb_context *ctdb;
293         uint32_t client_id;
294 };
295
296 /*
297   a wrapper to catch disconnected clients
298  */
299 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
300 {
301         struct ctdb_client *client;
302         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
303                                                             struct ctdb_daemon_packet_wrap);
304         if (w == NULL) {
305                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
306                 return;
307         }
308
309         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
310         if (client == NULL) {
311                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
312                          w->client_id));
313                 talloc_free(w);
314                 return;
315         }
316         talloc_free(w);
317
318         /* process it */
319         daemon_incoming_packet(client, hdr);    
320 }
321
322
323 /*
324   this is called when the ctdb daemon received a ctdb request call
325   from a local client over the unix domain socket
326  */
327 static void daemon_request_call_from_client(struct ctdb_client *client, 
328                                             struct ctdb_req_call *c)
329 {
330         struct ctdb_call_state *state;
331         struct ctdb_db_context *ctdb_db;
332         struct daemon_call_state *dstate;
333         struct ctdb_call *call;
334         struct ctdb_ltdb_header header;
335         TDB_DATA key, data;
336         int ret;
337         struct ctdb_context *ctdb = client->ctdb;
338         struct ctdb_daemon_packet_wrap *w;
339
340         CTDB_INCREMENT_STAT(ctdb, total_calls);
341         CTDB_DECREMENT_STAT(ctdb, pending_calls);
342
343         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
344         if (!ctdb_db) {
345                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
346                           c->db_id));
347                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
348                 return;
349         }
350
351         if (ctdb_db->unhealthy_reason) {
352                 /*
353                  * this is just a warning, as the tdb should be empty anyway,
354                  * and only persistent databases can be unhealthy, which doesn't
355                  * use this code patch
356                  */
357                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
358                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
359         }
360
361         key.dptr = c->data;
362         key.dsize = c->keylen;
363
364         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
365         CTDB_NO_MEMORY_VOID(ctdb, w);   
366
367         w->ctdb = ctdb;
368         w->client_id = client->client_id;
369
370         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
371                                            (struct ctdb_req_header *)c, &data,
372                                            daemon_incoming_packet_wrap, w, True);
373         if (ret == -2) {
374                 /* will retry later */
375                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
376                 return;
377         }
378
379         talloc_free(w);
380
381         if (ret != 0) {
382                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
383                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
384                 return;
385         }
386
387         dstate = talloc(client, struct daemon_call_state);
388         if (dstate == NULL) {
389                 ret = ctdb_ltdb_unlock(ctdb_db, key);
390                 if (ret != 0) {
391                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
392                 }
393
394                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
395                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
396                 return;
397         }
398         dstate->start_time = timeval_current();
399         dstate->client = client;
400         dstate->reqid  = c->hdr.reqid;
401         talloc_steal(dstate, data.dptr);
402
403         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
404         if (call == NULL) {
405                 ret = ctdb_ltdb_unlock(ctdb_db, key);
406                 if (ret != 0) {
407                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
408                 }
409
410                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
411                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
412                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
413                 return;
414         }
415
416         call->call_id = c->callid;
417         call->key = key;
418         call->call_data.dptr = c->data + c->keylen;
419         call->call_data.dsize = c->calldatalen;
420         call->flags = c->flags;
421
422         if (header.dmaster == ctdb->pnn) {
423                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
424         } else {
425                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
426         }
427
428         ret = ctdb_ltdb_unlock(ctdb_db, key);
429         if (ret != 0) {
430                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
431         }
432
433         if (state == NULL) {
434                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
435                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
436                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
437                 return;
438         }
439         talloc_steal(state, dstate);
440         talloc_steal(client, state);
441
442         state->async.fn = daemon_call_from_client_callback;
443         state->async.private_data = dstate;
444 }
445
446
447 static void daemon_request_control_from_client(struct ctdb_client *client, 
448                                                struct ctdb_req_control *c);
449
450 /* data contains a packet from the client */
451 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
452 {
453         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
454         TALLOC_CTX *tmp_ctx;
455         struct ctdb_context *ctdb = client->ctdb;
456
457         /* place the packet as a child of a tmp_ctx. We then use
458            talloc_free() below to free it. If any of the calls want
459            to keep it, then they will steal it somewhere else, and the
460            talloc_free() will be a no-op */
461         tmp_ctx = talloc_new(client);
462         talloc_steal(tmp_ctx, hdr);
463
464         if (hdr->ctdb_magic != CTDB_MAGIC) {
465                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
466                 goto done;
467         }
468
469         if (hdr->ctdb_version != CTDB_VERSION) {
470                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
471                 goto done;
472         }
473
474         switch (hdr->operation) {
475         case CTDB_REQ_CALL:
476                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
477                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
478                 break;
479
480         case CTDB_REQ_MESSAGE:
481                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
482                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
483                 break;
484
485         case CTDB_REQ_CONTROL:
486                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
487                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
488                 break;
489
490         default:
491                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
492                          hdr->operation));
493         }
494
495 done:
496         talloc_free(tmp_ctx);
497 }
498
499 /*
500   called when the daemon gets a incoming packet
501  */
502 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
503 {
504         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
505         struct ctdb_req_header *hdr;
506
507         if (cnt == 0) {
508                 talloc_free(client);
509                 return;
510         }
511
512         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
513
514         if (cnt < sizeof(*hdr)) {
515                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
516                                (unsigned)cnt);
517                 return;
518         }
519         hdr = (struct ctdb_req_header *)data;
520         if (cnt != hdr->length) {
521                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
522                                (unsigned)hdr->length, (unsigned)cnt);
523                 return;
524         }
525
526         if (hdr->ctdb_magic != CTDB_MAGIC) {
527                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
528                 return;
529         }
530
531         if (hdr->ctdb_version != CTDB_VERSION) {
532                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
533                 return;
534         }
535
536         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
537                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
538                  hdr->srcnode, hdr->destnode));
539
540         /* it is the responsibility of the incoming packet function to free 'data' */
541         daemon_incoming_packet(client, hdr);
542 }
543
544
545 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
546 {
547         if (client_pid->ctdb->client_pids != NULL) {
548                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
549         }
550
551         return 0;
552 }
553
554
555 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
556                          uint16_t flags, void *private_data)
557 {
558         struct sockaddr_un addr;
559         socklen_t len;
560         int fd;
561         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
562         struct ctdb_client *client;
563         struct ctdb_client_pid_list *client_pid;
564 #ifdef _AIX
565         struct peercred_struct cr;
566         socklen_t crl = sizeof(struct peercred_struct);
567 #else
568         struct ucred cr;
569         socklen_t crl = sizeof(struct ucred);
570 #endif
571
572         memset(&addr, 0, sizeof(addr));
573         len = sizeof(addr);
574         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
575         if (fd == -1) {
576                 return;
577         }
578
579         set_nonblocking(fd);
580         set_close_on_exec(fd);
581
582         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
583
584         client = talloc_zero(ctdb, struct ctdb_client);
585 #ifdef _AIX
586         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
587 #else
588         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
589 #endif
590                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
591         }
592
593         client->ctdb = ctdb;
594         client->fd = fd;
595         client->client_id = ctdb_reqid_new(ctdb, client);
596         client->pid = cr.pid;
597
598         client_pid = talloc(client, struct ctdb_client_pid_list);
599         if (client_pid == NULL) {
600                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
601                 close(fd);
602                 talloc_free(client);
603                 return;
604         }               
605         client_pid->ctdb   = ctdb;
606         client_pid->pid    = cr.pid;
607         client_pid->client = client;
608
609         DLIST_ADD(ctdb->client_pids, client_pid);
610
611         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
612                                          ctdb_daemon_read_cb, client,
613                                          "client-%u", client->pid);
614
615         talloc_set_destructor(client, ctdb_client_destructor);
616         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
617         CTDB_INCREMENT_STAT(ctdb, num_clients);
618 }
619
620
621
622 /*
623   create a unix domain socket and bind it
624   return a file descriptor open on the socket 
625 */
626 static int ux_socket_bind(struct ctdb_context *ctdb)
627 {
628         struct sockaddr_un addr;
629
630         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
631         if (ctdb->daemon.sd == -1) {
632                 return -1;
633         }
634
635         set_close_on_exec(ctdb->daemon.sd);
636         set_nonblocking(ctdb->daemon.sd);
637
638         memset(&addr, 0, sizeof(addr));
639         addr.sun_family = AF_UNIX;
640         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
641
642         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
643                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
644                 goto failed;
645         }       
646
647         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
648             chmod(ctdb->daemon.name, 0700) != 0) {
649                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
650                 goto failed;
651         } 
652
653
654         if (listen(ctdb->daemon.sd, 100) != 0) {
655                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
656                 goto failed;
657         }
658
659         return 0;
660
661 failed:
662         close(ctdb->daemon.sd);
663         ctdb->daemon.sd = -1;
664         return -1;      
665 }
666
667 static void sig_child_handler(struct event_context *ev,
668         struct signal_event *se, int signum, int count,
669         void *dont_care, 
670         void *private_data)
671 {
672 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
673         int status;
674         pid_t pid = -1;
675
676         while (pid != 0) {
677                 pid = waitpid(-1, &status, WNOHANG);
678                 if (pid == -1) {
679                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
680                         return;
681                 }
682                 if (pid > 0) {
683                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
684                 }
685         }
686 }
687
688 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
689                                       void *private_data)
690 {
691         if (status != 0) {
692                 ctdb_fatal(ctdb, "Failed to run setup event\n");
693                 return;
694         }
695         ctdb_run_notification_script(ctdb, "setup");
696
697         /* tell all other nodes we've just started up */
698         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
699                                  0, CTDB_CONTROL_STARTUP, 0,
700                                  CTDB_CTRL_FLAG_NOREPLY,
701                                  tdb_null, NULL, NULL);
702 }
703
704 /*
705   start the protocol going as a daemon
706 */
707 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
708 {
709         int res, ret = -1;
710         struct fd_event *fde;
711         const char *domain_socket_name;
712         struct signal_event *se;
713
714         /* get rid of any old sockets */
715         unlink(ctdb->daemon.name);
716
717         /* create a unix domain stream socket to listen to */
718         res = ux_socket_bind(ctdb);
719         if (res!=0) {
720                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
721                 exit(10);
722         }
723
724         if (do_fork && fork()) {
725                 return 0;
726         }
727
728         tdb_reopen_all(False);
729
730         if (do_fork) {
731                 setsid();
732                 close(0);
733                 if (open("/dev/null", O_RDONLY) != 0) {
734                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
735                         exit(11);
736                 }
737         }
738         block_signal(SIGPIPE);
739
740         ctdbd_pid = getpid();
741
742
743         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
744
745         ctdb_high_priority(ctdb);
746
747         /* ensure the socket is deleted on exit of the daemon */
748         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
749         if (domain_socket_name == NULL) {
750                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
751                 exit(12);
752         }
753
754         ctdb->ev = event_context_init(NULL);
755         tevent_loop_allow_nesting(ctdb->ev);
756         ret = ctdb_init_tevent_logging(ctdb);
757         if (ret != 0) {
758                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
759                 exit(1);
760         }
761
762         ctdb_set_child_logging(ctdb);
763
764         /* initialize statistics collection */
765         ctdb_statistics_init(ctdb);
766
767         /* force initial recovery for election */
768         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
769
770         if (strcmp(ctdb->transport, "tcp") == 0) {
771                 int ctdb_tcp_init(struct ctdb_context *);
772                 ret = ctdb_tcp_init(ctdb);
773         }
774 #ifdef USE_INFINIBAND
775         if (strcmp(ctdb->transport, "ib") == 0) {
776                 int ctdb_ibw_init(struct ctdb_context *);
777                 ret = ctdb_ibw_init(ctdb);
778         }
779 #endif
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
782                 return -1;
783         }
784
785         if (ctdb->methods == NULL) {
786                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
787                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
788         }
789
790         /* initialise the transport  */
791         if (ctdb->methods->initialise(ctdb) != 0) {
792                 ctdb_fatal(ctdb, "transport failed to initialise");
793         }
794         if (public_address_list) {
795                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
796                 if (ret == -1) {
797                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
798                         exit(1);
799                 }
800         }
801
802
803         /* attach to existing databases */
804         if (ctdb_attach_databases(ctdb) != 0) {
805                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
806         }
807
808         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
809         if (ret != 0) {
810                 ctdb_fatal(ctdb, "Failed to run init event\n");
811         }
812         ctdb_run_notification_script(ctdb, "init");
813
814         /* start frozen, then let the first election sort things out */
815         if (ctdb_blocking_freeze(ctdb)) {
816                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
817         }
818
819         /* now start accepting clients, only can do this once frozen */
820         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
821                            EVENT_FD_READ,
822                            ctdb_accept_client, ctdb);
823         tevent_fd_set_auto_close(fde);
824
825         /* release any IPs we hold from previous runs of the daemon */
826         if (ctdb->tunable.disable_ip_failover == 0) {
827                 ctdb_release_all_ips(ctdb);
828         }
829
830         /* start the transport going */
831         ctdb_start_transport(ctdb);
832
833         /* set up a handler to pick up sigchld */
834         se = event_add_signal(ctdb->ev, ctdb,
835                                      SIGCHLD, 0,
836                                      sig_child_handler,
837                                      ctdb);
838         if (se == NULL) {
839                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
840                 exit(1);
841         }
842
843         ret = ctdb_event_script_callback(ctdb,
844                                          ctdb,
845                                          ctdb_setup_event_callback,
846                                          ctdb,
847                                          false,
848                                          CTDB_EVENT_SETUP,
849                                          "");
850         if (ret != 0) {
851                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
852                 exit(1);
853         }
854
855         if (use_syslog) {
856                 if (start_syslog_daemon(ctdb)) {
857                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
858                         exit(10);
859                 }
860         }
861
862         ctdb_lockdown_memory(ctdb);
863           
864         /* go into a wait loop to allow other nodes to complete */
865         event_loop_wait(ctdb->ev);
866
867         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
868         exit(1);
869 }
870
871 /*
872   allocate a packet for use in daemon<->daemon communication
873  */
874 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
875                                                  TALLOC_CTX *mem_ctx, 
876                                                  enum ctdb_operation operation, 
877                                                  size_t length, size_t slength,
878                                                  const char *type)
879 {
880         int size;
881         struct ctdb_req_header *hdr;
882
883         length = MAX(length, slength);
884         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
885
886         if (ctdb->methods == NULL) {
887                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
888                          operation, (unsigned)length));
889                 return NULL;
890         }
891
892         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
893         if (hdr == NULL) {
894                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
895                          operation, (unsigned)length));
896                 return NULL;
897         }
898         talloc_set_name_const(hdr, type);
899         memset(hdr, 0, slength);
900         hdr->length       = length;
901         hdr->operation    = operation;
902         hdr->ctdb_magic   = CTDB_MAGIC;
903         hdr->ctdb_version = CTDB_VERSION;
904         hdr->generation   = ctdb->vnn_map->generation;
905         hdr->srcnode      = ctdb->pnn;
906
907         return hdr;     
908 }
909
910 struct daemon_control_state {
911         struct daemon_control_state *next, *prev;
912         struct ctdb_client *client;
913         struct ctdb_req_control *c;
914         uint32_t reqid;
915         struct ctdb_node *node;
916 };
917
918 /*
919   callback when a control reply comes in
920  */
921 static void daemon_control_callback(struct ctdb_context *ctdb,
922                                     int32_t status, TDB_DATA data, 
923                                     const char *errormsg,
924                                     void *private_data)
925 {
926         struct daemon_control_state *state = talloc_get_type(private_data, 
927                                                              struct daemon_control_state);
928         struct ctdb_client *client = state->client;
929         struct ctdb_reply_control *r;
930         size_t len;
931         int ret;
932
933         /* construct a message to send to the client containing the data */
934         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
935         if (errormsg) {
936                 len += strlen(errormsg);
937         }
938         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
939                                struct ctdb_reply_control);
940         CTDB_NO_MEMORY_VOID(ctdb, r);
941
942         r->hdr.reqid     = state->reqid;
943         r->status        = status;
944         r->datalen       = data.dsize;
945         r->errorlen = 0;
946         memcpy(&r->data[0], data.dptr, data.dsize);
947         if (errormsg) {
948                 r->errorlen = strlen(errormsg);
949                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
950         }
951
952         ret = daemon_queue_send(client, &r->hdr);
953         if (ret != -1) {
954                 talloc_free(state);
955         }
956 }
957
958 /*
959   fail all pending controls to a disconnected node
960  */
961 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
962 {
963         struct daemon_control_state *state;
964         while ((state = node->pending_controls)) {
965                 DLIST_REMOVE(node->pending_controls, state);
966                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
967                                         "node is disconnected", state);
968         }
969 }
970
971 /*
972   destroy a daemon_control_state
973  */
974 static int daemon_control_destructor(struct daemon_control_state *state)
975 {
976         if (state->node) {
977                 DLIST_REMOVE(state->node->pending_controls, state);
978         }
979         return 0;
980 }
981
982 /*
983   this is called when the ctdb daemon received a ctdb request control
984   from a local client over the unix domain socket
985  */
986 static void daemon_request_control_from_client(struct ctdb_client *client, 
987                                                struct ctdb_req_control *c)
988 {
989         TDB_DATA data;
990         int res;
991         struct daemon_control_state *state;
992         TALLOC_CTX *tmp_ctx = talloc_new(client);
993
994         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
995                 c->hdr.destnode = client->ctdb->pnn;
996         }
997
998         state = talloc(client, struct daemon_control_state);
999         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1000
1001         state->client = client;
1002         state->c = talloc_steal(state, c);
1003         state->reqid = c->hdr.reqid;
1004         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1005                 state->node = client->ctdb->nodes[c->hdr.destnode];
1006                 DLIST_ADD(state->node->pending_controls, state);
1007         } else {
1008                 state->node = NULL;
1009         }
1010
1011         talloc_set_destructor(state, daemon_control_destructor);
1012
1013         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1014                 talloc_steal(tmp_ctx, state);
1015         }
1016         
1017         data.dptr = &c->data[0];
1018         data.dsize = c->datalen;
1019         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1020                                        c->srvid, c->opcode, client->client_id,
1021                                        c->flags,
1022                                        data, daemon_control_callback,
1023                                        state);
1024         if (res != 0) {
1025                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1026                          c->hdr.destnode));
1027         }
1028
1029         talloc_free(tmp_ctx);
1030 }
1031
1032 /*
1033   register a call function
1034 */
1035 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1036                          ctdb_fn_t fn, int id)
1037 {
1038         struct ctdb_registered_call *call;
1039         struct ctdb_db_context *ctdb_db;
1040
1041         ctdb_db = find_ctdb_db(ctdb, db_id);
1042         if (ctdb_db == NULL) {
1043                 return -1;
1044         }
1045
1046         call = talloc(ctdb_db, struct ctdb_registered_call);
1047         call->fn = fn;
1048         call->id = id;
1049
1050         DLIST_ADD(ctdb_db->calls, call);        
1051         return 0;
1052 }
1053
1054
1055
1056 /*
1057   this local messaging handler is ugly, but is needed to prevent
1058   recursion in ctdb_send_message() when the destination node is the
1059   same as the source node
1060  */
1061 struct ctdb_local_message {
1062         struct ctdb_context *ctdb;
1063         uint64_t srvid;
1064         TDB_DATA data;
1065 };
1066
1067 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1068                                        struct timeval t, void *private_data)
1069 {
1070         struct ctdb_local_message *m = talloc_get_type(private_data, 
1071                                                        struct ctdb_local_message);
1072         int res;
1073
1074         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1075         if (res != 0) {
1076                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1077                           (unsigned long long)m->srvid));
1078         }
1079         talloc_free(m);
1080 }
1081
1082 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1083 {
1084         struct ctdb_local_message *m;
1085         m = talloc(ctdb, struct ctdb_local_message);
1086         CTDB_NO_MEMORY(ctdb, m);
1087
1088         m->ctdb = ctdb;
1089         m->srvid = srvid;
1090         m->data  = data;
1091         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1092         if (m->data.dptr == NULL) {
1093                 talloc_free(m);
1094                 return -1;
1095         }
1096
1097         /* this needs to be done as an event to prevent recursion */
1098         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1099         return 0;
1100 }
1101
1102 /*
1103   send a ctdb message
1104 */
1105 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1106                              uint64_t srvid, TDB_DATA data)
1107 {
1108         struct ctdb_req_message *r;
1109         int len;
1110
1111         if (ctdb->methods == NULL) {
1112                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1113                 return -1;
1114         }
1115
1116         /* see if this is a message to ourselves */
1117         if (pnn == ctdb->pnn) {
1118                 return ctdb_local_message(ctdb, srvid, data);
1119         }
1120
1121         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1122         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1123                                     struct ctdb_req_message);
1124         CTDB_NO_MEMORY(ctdb, r);
1125
1126         r->hdr.destnode  = pnn;
1127         r->srvid         = srvid;
1128         r->datalen       = data.dsize;
1129         memcpy(&r->data[0], data.dptr, data.dsize);
1130
1131         ctdb_queue_packet(ctdb, &r->hdr);
1132
1133         talloc_free(r);
1134         return 0;
1135 }
1136
1137
1138
1139 struct ctdb_client_notify_list {
1140         struct ctdb_client_notify_list *next, *prev;
1141         struct ctdb_context *ctdb;
1142         uint64_t srvid;
1143         TDB_DATA data;
1144 };
1145
1146
1147 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1148 {
1149         int ret;
1150
1151         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1152
1153         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1154         if (ret != 0) {
1155                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1156         }
1157
1158         return 0;
1159 }
1160
1161 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1162 {
1163         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1164         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1165         struct ctdb_client_notify_list *nl;
1166
1167         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1168
1169         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1170                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1171                 return -1;
1172         }
1173
1174         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1175                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1176                 return -1;
1177         }
1178
1179
1180         if (client == NULL) {
1181                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1182                 return -1;
1183         }
1184
1185         for(nl=client->notify; nl; nl=nl->next) {
1186                 if (nl->srvid == notify->srvid) {
1187                         break;
1188                 }
1189         }
1190         if (nl != NULL) {
1191                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1192                 return -1;
1193         }
1194
1195         nl = talloc(client, struct ctdb_client_notify_list);
1196         CTDB_NO_MEMORY(ctdb, nl);
1197         nl->ctdb       = ctdb;
1198         nl->srvid      = notify->srvid;
1199         nl->data.dsize = notify->len;
1200         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1201         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1202         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1203         
1204         DLIST_ADD(client->notify, nl);
1205         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1206
1207         return 0;
1208 }
1209
1210 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1211 {
1212         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1213         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1214         struct ctdb_client_notify_list *nl;
1215
1216         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1217
1218         if (client == NULL) {
1219                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1220                 return -1;
1221         }
1222
1223         for(nl=client->notify; nl; nl=nl->next) {
1224                 if (nl->srvid == notify->srvid) {
1225                         break;
1226                 }
1227         }
1228         if (nl == NULL) {
1229                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1230                 return -1;
1231         }
1232
1233         DLIST_REMOVE(client->notify, nl);
1234         talloc_set_destructor(nl, NULL);
1235         talloc_free(nl);
1236
1237         return 0;
1238 }
1239
1240 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1241 {
1242         struct ctdb_client_pid_list *client_pid;
1243
1244         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1245                 if (client_pid->pid == pid) {
1246                         return client_pid->client;
1247                 }
1248         }
1249         return NULL;
1250 }
1251
1252
1253 /* This control is used by samba when probing if a process (of a samba daemon)
1254    exists on the node.
1255    Samba does this when it needs/wants to check if a subrecord in one of the
1256    databases is still valied, or if it is stale and can be removed.
1257    If the node is in unhealthy or stopped state we just kill of the samba
1258    process holding htis sub-record and return to the calling samba that
1259    the process does not exist.
1260    This allows us to forcefully recall subrecords registered by samba processes
1261    on banned and stopped nodes.
1262 */
1263 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1264 {
1265         struct ctdb_client *client;
1266
1267         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1268                 client = ctdb_find_client_by_pid(ctdb, pid);
1269                 if (client != NULL) {
1270                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1271                         talloc_free(client);
1272                 }
1273                 return -1;
1274         }
1275
1276         return kill(pid, 0);
1277 }