changed some debug levels
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/events/events.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb.h"
29 #include "../include/ctdb_private.h"
30
31 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
32
33 /*
34   handler for when a node changes its flags
35 */
36 static void flag_change_handler(struct ctdb_context *ctdb, uint64_t srvid, 
37                                 TDB_DATA data, void *private_data)
38 {
39         struct ctdb_node_flag_change *c = (struct ctdb_node_flag_change *)data.dptr;
40
41         if (data.dsize != sizeof(*c) || !ctdb_validate_pnn(ctdb, c->pnn)) {
42                 DEBUG(0,(__location__ "Invalid data in ctdb_node_flag_change\n"));
43                 return;
44         }
45
46         if (!ctdb_validate_pnn(ctdb, c->pnn)) {
47                 DEBUG(0,("Bad pnn %u in flag_change_handler\n", c->pnn));
48                 return;
49         }
50
51         /* don't get the disconnected flag from the other node */
52         ctdb->nodes[c->pnn]->flags = 
53                 (ctdb->nodes[c->pnn]->flags&NODE_FLAGS_DISCONNECTED) 
54                 | (c->new_flags & ~NODE_FLAGS_DISCONNECTED);    
55         DEBUG(2,("Node flags for node %u are now 0x%x\n", c->pnn, ctdb->nodes[c->pnn]->flags));
56
57         /* make sure we don't hold any IPs when we shouldn't */
58         if (c->pnn == ctdb->pnn &&
59             (ctdb->nodes[c->pnn]->flags & (NODE_FLAGS_INACTIVE|NODE_FLAGS_BANNED))) {
60                 ctdb_release_all_ips(ctdb);
61         }
62 }
63
64 /* called when the "startup" event script has finished */
65 static void ctdb_start_transport(struct ctdb_context *ctdb, int status, void *p)
66 {
67         if (status != 0) {
68                 DEBUG(0,("startup event failed!\n"));
69                 ctdb_fatal(ctdb, "startup event script failed");                
70         }
71
72         /* start the transport running */
73         if (ctdb->methods->start(ctdb) != 0) {
74                 DEBUG(0,("transport failed to start!\n"));
75                 ctdb_fatal(ctdb, "transport failed to start");
76         }
77
78         /* start the recovery daemon process */
79         if (ctdb_start_recoverd(ctdb) != 0) {
80                 DEBUG(0,("Failed to start recovery daemon\n"));
81                 exit(11);
82         }
83
84         /* a handler for when nodes are disabled/enabled */
85         ctdb_register_message_handler(ctdb, ctdb, CTDB_SRVID_NODE_FLAGS_CHANGED, 
86                                       flag_change_handler, NULL);
87
88         /* start monitoring for dead nodes */
89         ctdb_start_monitoring(ctdb);
90
91         /* start periodic update of tcp tickle lists */
92         ctdb_start_tcp_tickle_update(ctdb);
93 }
94
95 /* go into main ctdb loop */
96 static void ctdb_main_loop(struct ctdb_context *ctdb)
97 {
98         int ret = -1;
99
100         if (strcmp(ctdb->transport, "tcp") == 0) {
101                 int ctdb_tcp_init(struct ctdb_context *);
102                 ret = ctdb_tcp_init(ctdb);
103         }
104 #ifdef USE_INFINIBAND
105         if (strcmp(ctdb->transport, "ib") == 0) {
106                 int ctdb_ibw_init(struct ctdb_context *);
107                 ret = ctdb_ibw_init(ctdb);
108         }
109 #endif
110         if (ret != 0) {
111                 DEBUG(0,("Failed to initialise transport '%s'\n", ctdb->transport));
112                 return;
113         }
114
115         /* initialise the transport  */
116         if (ctdb->methods->initialise(ctdb) != 0) {
117                 DEBUG(0,("transport failed to initialise!\n"));
118                 ctdb_fatal(ctdb, "transport failed to initialise");
119         }
120
121         /* tell all other nodes we've just started up */
122         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
123                                  0, CTDB_CONTROL_STARTUP, 0,
124                                  CTDB_CTRL_FLAG_NOREPLY,
125                                  tdb_null, NULL, NULL);
126
127         /* release any IPs we hold from previous runs of the daemon */
128         ctdb_release_all_ips(ctdb);
129
130         ret = ctdb_event_script_callback(ctdb, timeval_zero(), ctdb, 
131                                          ctdb_start_transport, NULL, "startup");
132         if (ret != 0) {
133                 DEBUG(0,("Failed startup event script\n"));
134                 return;
135         }
136
137         /* go into a wait loop to allow other nodes to complete */
138         event_loop_wait(ctdb->ev);
139
140         DEBUG(0,("event_loop_wait() returned. this should not happen\n"));
141         exit(1);
142 }
143
144
145 static void block_signal(int signum)
146 {
147         struct sigaction act;
148
149         memset(&act, 0, sizeof(act));
150
151         act.sa_handler = SIG_IGN;
152         sigemptyset(&act.sa_mask);
153         sigaddset(&act.sa_mask, signum);
154         sigaction(signum, &act, NULL);
155 }
156
157
158 /*
159   send a packet to a client
160  */
161 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
162 {
163         client->ctdb->statistics.client_packets_sent++;
164         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
165 }
166
167 /*
168   message handler for when we are in daemon mode. This redirects the message
169   to the right client
170  */
171 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
172                                     TDB_DATA data, void *private_data)
173 {
174         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
175         struct ctdb_req_message *r;
176         int len;
177
178         /* construct a message to send to the client containing the data */
179         len = offsetof(struct ctdb_req_message, data) + data.dsize;
180         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
181                                len, struct ctdb_req_message);
182         CTDB_NO_MEMORY_VOID(ctdb, r);
183
184         talloc_set_name_const(r, "req_message packet");
185
186         r->srvid         = srvid;
187         r->datalen       = data.dsize;
188         memcpy(&r->data[0], data.dptr, data.dsize);
189
190         daemon_queue_send(client, &r->hdr);
191
192         talloc_free(r);
193 }
194                                            
195
196 /*
197   this is called when the ctdb daemon received a ctdb request to 
198   set the srvid from the client
199  */
200 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
201 {
202         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
203         int res;
204         if (client == NULL) {
205                 DEBUG(0,("Bad client_id in daemon_request_register_message_handler\n"));
206                 return -1;
207         }
208         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
209         if (res != 0) {
210                 DEBUG(0,(__location__ " Failed to register handler %llu in daemon\n", 
211                          (unsigned long long)srvid));
212         } else {
213                 DEBUG(1,(__location__ " Registered message handler for srvid=%llu\n", 
214                          (unsigned long long)srvid));
215         }
216
217         /* this is a hack for Samba - we now know the pid of the Samba client */
218         if ((srvid & 0xFFFFFFFF) == srvid &&
219             kill(srvid, 0) == 0) {
220                 client->pid = srvid;
221                 DEBUG(1,(__location__ " Registered PID %u for client %u\n",
222                          (unsigned)client->pid, client_id));
223         }
224         return res;
225 }
226
227 /*
228   this is called when the ctdb daemon received a ctdb request to 
229   remove a srvid from the client
230  */
231 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
232 {
233         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
234         if (client == NULL) {
235                 DEBUG(0,("Bad client_id in daemon_request_deregister_message_handler\n"));
236                 return -1;
237         }
238         return ctdb_deregister_message_handler(ctdb, srvid, client);
239 }
240
241
242 /*
243   destroy a ctdb_client
244 */
245 static int ctdb_client_destructor(struct ctdb_client *client)
246 {
247         ctdb_takeover_client_destructor_hook(client);
248         ctdb_reqid_remove(client->ctdb, client->client_id);
249         client->ctdb->statistics.num_clients--;
250         return 0;
251 }
252
253
254 /*
255   this is called when the ctdb daemon received a ctdb request message
256   from a local client over the unix domain socket
257  */
258 static void daemon_request_message_from_client(struct ctdb_client *client, 
259                                                struct ctdb_req_message *c)
260 {
261         TDB_DATA data;
262         int res;
263
264         /* maybe the message is for another client on this node */
265         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
266                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
267                 return;
268         }
269
270         /* its for a remote node */
271         data.dptr = &c->data[0];
272         data.dsize = c->datalen;
273         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
274                                        c->srvid, data);
275         if (res != 0) {
276                 DEBUG(0,(__location__ " Failed to send message to remote node %u\n",
277                          c->hdr.destnode));
278         }
279 }
280
281
282 struct daemon_call_state {
283         struct ctdb_client *client;
284         uint32_t reqid;
285         struct ctdb_call *call;
286         struct timeval start_time;
287 };
288
289 /* 
290    complete a call from a client 
291 */
292 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
293 {
294         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
295                                                            struct daemon_call_state);
296         struct ctdb_reply_call *r;
297         int res;
298         uint32_t length;
299         struct ctdb_client *client = dstate->client;
300
301         talloc_steal(client, dstate);
302         talloc_steal(dstate, dstate->call);
303
304         res = ctdb_daemon_call_recv(state, dstate->call);
305         if (res != 0) {
306                 DEBUG(0, (__location__ " ctdbd_call_recv() returned error\n"));
307                 client->ctdb->statistics.pending_calls--;
308                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
309                 return;
310         }
311
312         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
313         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
314                                length, struct ctdb_reply_call);
315         if (r == NULL) {
316                 DEBUG(0, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
317                 client->ctdb->statistics.pending_calls--;
318                 ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
319                 return;
320         }
321         r->hdr.reqid        = dstate->reqid;
322         r->datalen          = dstate->call->reply_data.dsize;
323         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
324
325         res = daemon_queue_send(client, &r->hdr);
326         if (res != 0) {
327                 DEBUG(0, (__location__ " Failed to queue packet from daemon to client\n"));
328         }
329         ctdb_latency(&client->ctdb->statistics.max_call_latency, dstate->start_time);
330         talloc_free(dstate);
331         client->ctdb->statistics.pending_calls--;
332 }
333
334
335 static void daemon_request_call_from_client(struct ctdb_client *client, 
336                                             struct ctdb_req_call *c);
337
338 /*
339   this is called when the ctdb daemon received a ctdb request call
340   from a local client over the unix domain socket
341  */
342 static void daemon_request_call_from_client(struct ctdb_client *client, 
343                                             struct ctdb_req_call *c)
344 {
345         struct ctdb_call_state *state;
346         struct ctdb_db_context *ctdb_db;
347         struct daemon_call_state *dstate;
348         struct ctdb_call *call;
349         struct ctdb_ltdb_header header;
350         TDB_DATA key, data;
351         int ret;
352         struct ctdb_context *ctdb = client->ctdb;
353
354         ctdb->statistics.total_calls++;
355         ctdb->statistics.pending_calls++;
356
357         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
358         if (!ctdb_db) {
359                 DEBUG(0, (__location__ " Unknown database in request. db_id==0x%08x",
360                           c->db_id));
361                 ctdb->statistics.pending_calls--;
362                 return;
363         }
364
365         key.dptr = c->data;
366         key.dsize = c->keylen;
367
368         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
369                                            (struct ctdb_req_header *)c, &data,
370                                            daemon_incoming_packet, client, True);
371         if (ret == -2) {
372                 /* will retry later */
373                 ctdb->statistics.pending_calls--;
374                 return;
375         }
376
377         if (ret != 0) {
378                 DEBUG(0,(__location__ " Unable to fetch record\n"));
379                 ctdb->statistics.pending_calls--;
380                 return;
381         }
382
383         dstate = talloc(client, struct daemon_call_state);
384         if (dstate == NULL) {
385                 ctdb_ltdb_unlock(ctdb_db, key);
386                 DEBUG(0,(__location__ " Unable to allocate dstate\n"));
387                 ctdb->statistics.pending_calls--;
388                 return;
389         }
390         dstate->start_time = timeval_current();
391         dstate->client = client;
392         dstate->reqid  = c->hdr.reqid;
393         talloc_steal(dstate, data.dptr);
394
395         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
396         if (call == NULL) {
397                 ctdb_ltdb_unlock(ctdb_db, key);
398                 DEBUG(0,(__location__ " Unable to allocate call\n"));
399                 ctdb->statistics.pending_calls--;
400                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
401                 return;
402         }
403
404         call->call_id = c->callid;
405         call->key = key;
406         call->call_data.dptr = c->data + c->keylen;
407         call->call_data.dsize = c->calldatalen;
408         call->flags = c->flags;
409
410         if (header.dmaster == ctdb->pnn) {
411                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
412         } else {
413                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
414         }
415
416         ctdb_ltdb_unlock(ctdb_db, key);
417
418         if (state == NULL) {
419                 DEBUG(0,(__location__ " Unable to setup call send\n"));
420                 ctdb->statistics.pending_calls--;
421                 ctdb_latency(&ctdb->statistics.max_call_latency, dstate->start_time);
422                 return;
423         }
424         talloc_steal(state, dstate);
425         talloc_steal(client, state);
426
427         state->async.fn = daemon_call_from_client_callback;
428         state->async.private_data = dstate;
429 }
430
431
432 static void daemon_request_control_from_client(struct ctdb_client *client, 
433                                                struct ctdb_req_control *c);
434
435 /* data contains a packet from the client */
436 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
437 {
438         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
439         TALLOC_CTX *tmp_ctx;
440         struct ctdb_context *ctdb = client->ctdb;
441
442         /* place the packet as a child of a tmp_ctx. We then use
443            talloc_free() below to free it. If any of the calls want
444            to keep it, then they will steal it somewhere else, and the
445            talloc_free() will be a no-op */
446         tmp_ctx = talloc_new(client);
447         talloc_steal(tmp_ctx, hdr);
448
449         if (hdr->ctdb_magic != CTDB_MAGIC) {
450                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
451                 goto done;
452         }
453
454         if (hdr->ctdb_version != CTDB_VERSION) {
455                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
456                 goto done;
457         }
458
459         switch (hdr->operation) {
460         case CTDB_REQ_CALL:
461                 ctdb->statistics.client.req_call++;
462                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
463                 break;
464
465         case CTDB_REQ_MESSAGE:
466                 ctdb->statistics.client.req_message++;
467                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
468                 break;
469
470         case CTDB_REQ_CONTROL:
471                 ctdb->statistics.client.req_control++;
472                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
473                 break;
474
475         default:
476                 DEBUG(0,(__location__ " daemon: unrecognized operation %u\n",
477                          hdr->operation));
478         }
479
480 done:
481         talloc_free(tmp_ctx);
482 }
483
484 /*
485   called when the daemon gets a incoming packet
486  */
487 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
488 {
489         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
490         struct ctdb_req_header *hdr;
491
492         if (cnt == 0) {
493                 talloc_free(client);
494                 return;
495         }
496
497         client->ctdb->statistics.client_packets_recv++;
498
499         if (cnt < sizeof(*hdr)) {
500                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
501                                (unsigned)cnt);
502                 return;
503         }
504         hdr = (struct ctdb_req_header *)data;
505         if (cnt != hdr->length) {
506                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
507                                (unsigned)hdr->length, (unsigned)cnt);
508                 return;
509         }
510
511         if (hdr->ctdb_magic != CTDB_MAGIC) {
512                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
513                 return;
514         }
515
516         if (hdr->ctdb_version != CTDB_VERSION) {
517                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
518                 return;
519         }
520
521         DEBUG(3,(__location__ " client request %u of type %u length %u from "
522                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
523                  hdr->srcnode, hdr->destnode));
524
525         /* it is the responsibility of the incoming packet function to free 'data' */
526         daemon_incoming_packet(client, hdr);
527 }
528
529 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
530                          uint16_t flags, void *private_data)
531 {
532         struct sockaddr_in addr;
533         socklen_t len;
534         int fd;
535         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
536         struct ctdb_client *client;
537
538         memset(&addr, 0, sizeof(addr));
539         len = sizeof(addr);
540         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
541         if (fd == -1) {
542                 return;
543         }
544
545         set_nonblocking(fd);
546         set_close_on_exec(fd);
547
548         client = talloc_zero(ctdb, struct ctdb_client);
549         client->ctdb = ctdb;
550         client->fd = fd;
551         client->client_id = ctdb_reqid_new(ctdb, client);
552         ctdb->statistics.num_clients++;
553
554         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
555                                          ctdb_daemon_read_cb, client);
556
557         talloc_set_destructor(client, ctdb_client_destructor);
558 }
559
560
561
562 /*
563   create a unix domain socket and bind it
564   return a file descriptor open on the socket 
565 */
566 static int ux_socket_bind(struct ctdb_context *ctdb)
567 {
568         struct sockaddr_un addr;
569
570         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
571         if (ctdb->daemon.sd == -1) {
572                 return -1;
573         }
574
575         set_nonblocking(ctdb->daemon.sd);
576         set_close_on_exec(ctdb->daemon.sd);
577
578 #if 0
579         /* AIX doesn't like this :( */
580         if (fchown(ctdb->daemon.sd, geteuid(), getegid()) != 0 ||
581             fchmod(ctdb->daemon.sd, 0700) != 0) {
582                 DEBUG(0,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n"));
583                 goto failed;
584         }
585 #endif
586
587         set_nonblocking(ctdb->daemon.sd);
588
589         memset(&addr, 0, sizeof(addr));
590         addr.sun_family = AF_UNIX;
591         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
592
593         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
594                 DEBUG(0,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
595                 goto failed;
596         }       
597         if (listen(ctdb->daemon.sd, 10) != 0) {
598                 DEBUG(0,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
599                 goto failed;
600         }
601
602         return 0;
603
604 failed:
605         close(ctdb->daemon.sd);
606         ctdb->daemon.sd = -1;
607         return -1;      
608 }
609
610 /*
611   delete the socket on exit - called on destruction of autofree context
612  */
613 static int unlink_destructor(const char *name)
614 {
615         unlink(name);
616         return 0;
617 }
618
619 static void print_exit_message(void)
620 {
621         DEBUG(0,("CTDB daemon shutting down\n"));
622 }
623
624 /*
625   start the protocol going as a daemon
626 */
627 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
628 {
629         int res;
630         struct fd_event *fde;
631         const char *domain_socket_name;
632
633         /* get rid of any old sockets */
634         unlink(ctdb->daemon.name);
635
636         /* create a unix domain stream socket to listen to */
637         res = ux_socket_bind(ctdb);
638         if (res!=0) {
639                 DEBUG(0,(__location__ " Failed to open CTDB unix domain socket\n"));
640                 exit(10);
641         }
642
643         if (do_fork && fork()) {
644                 return 0;
645         }
646
647         /* Make sure we log something when the daemon terminates */
648         atexit(print_exit_message);
649
650         tdb_reopen_all(False);
651
652         if (do_fork) {
653                 setsid();
654         }
655         block_signal(SIGPIPE);
656
657         if (ctdb->do_setsched) {
658                 /* try to set us up as realtime */
659                 ctdb_set_scheduler(ctdb);
660         }
661
662         /* ensure the socket is deleted on exit of the daemon */
663         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
664         talloc_set_destructor(domain_socket_name, unlink_destructor);   
665
666         ctdb->ev = event_context_init(NULL);
667
668         /* start frozen, then let the first election sort things out */
669         if (!ctdb_blocking_freeze(ctdb)) {
670                 DEBUG(0,("Failed to get initial freeze\n"));
671                 exit(12);
672         }
673
674         /* force initial recovery for election */
675         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
676
677         /* now start accepting clients, only can do this once frozen */
678         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
679                            EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
680                            ctdb_accept_client, ctdb);
681
682         ctdb_main_loop(ctdb);
683
684         return 0;
685 }
686
687 /*
688   allocate a packet for use in daemon<->daemon communication
689  */
690 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
691                                                  TALLOC_CTX *mem_ctx, 
692                                                  enum ctdb_operation operation, 
693                                                  size_t length, size_t slength,
694                                                  const char *type)
695 {
696         int size;
697         struct ctdb_req_header *hdr;
698
699         length = MAX(length, slength);
700         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
701
702         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
703         if (hdr == NULL) {
704                 DEBUG(0,("Unable to allocate transport packet for operation %u of length %u\n",
705                          operation, (unsigned)length));
706                 return NULL;
707         }
708         talloc_set_name_const(hdr, type);
709         memset(hdr, 0, slength);
710         hdr->length       = length;
711         hdr->operation    = operation;
712         hdr->ctdb_magic   = CTDB_MAGIC;
713         hdr->ctdb_version = CTDB_VERSION;
714         hdr->generation   = ctdb->vnn_map->generation;
715         hdr->srcnode      = ctdb->pnn;
716
717         return hdr;     
718 }
719
720 struct daemon_control_state {
721         struct daemon_control_state *next, *prev;
722         struct ctdb_client *client;
723         struct ctdb_req_control *c;
724         uint32_t reqid;
725         struct ctdb_node *node;
726 };
727
728 /*
729   callback when a control reply comes in
730  */
731 static void daemon_control_callback(struct ctdb_context *ctdb,
732                                     int32_t status, TDB_DATA data, 
733                                     const char *errormsg,
734                                     void *private_data)
735 {
736         struct daemon_control_state *state = talloc_get_type(private_data, 
737                                                              struct daemon_control_state);
738         struct ctdb_client *client = state->client;
739         struct ctdb_reply_control *r;
740         size_t len;
741
742         /* construct a message to send to the client containing the data */
743         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
744         if (errormsg) {
745                 len += strlen(errormsg);
746         }
747         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
748                                struct ctdb_reply_control);
749         CTDB_NO_MEMORY_VOID(ctdb, r);
750
751         r->hdr.reqid     = state->reqid;
752         r->status        = status;
753         r->datalen       = data.dsize;
754         r->errorlen = 0;
755         memcpy(&r->data[0], data.dptr, data.dsize);
756         if (errormsg) {
757                 r->errorlen = strlen(errormsg);
758                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
759         }
760
761         daemon_queue_send(client, &r->hdr);
762
763         talloc_free(state);
764 }
765
766 /*
767   fail all pending controls to a disconnected node
768  */
769 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
770 {
771         struct daemon_control_state *state;
772         while ((state = node->pending_controls)) {
773                 DLIST_REMOVE(node->pending_controls, state);
774                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
775                                         "node is disconnected", state);
776         }
777 }
778
779 /*
780   destroy a daemon_control_state
781  */
782 static int daemon_control_destructor(struct daemon_control_state *state)
783 {
784         if (state->node) {
785                 DLIST_REMOVE(state->node->pending_controls, state);
786         }
787         return 0;
788 }
789
790 /*
791   this is called when the ctdb daemon received a ctdb request control
792   from a local client over the unix domain socket
793  */
794 static void daemon_request_control_from_client(struct ctdb_client *client, 
795                                                struct ctdb_req_control *c)
796 {
797         TDB_DATA data;
798         int res;
799         struct daemon_control_state *state;
800         TALLOC_CTX *tmp_ctx = talloc_new(client);
801
802         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
803                 c->hdr.destnode = client->ctdb->pnn;
804         }
805
806         state = talloc(client, struct daemon_control_state);
807         CTDB_NO_MEMORY_VOID(client->ctdb, state);
808
809         state->client = client;
810         state->c = talloc_steal(state, c);
811         state->reqid = c->hdr.reqid;
812         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
813                 state->node = client->ctdb->nodes[c->hdr.destnode];
814                 DLIST_ADD(state->node->pending_controls, state);
815         } else {
816                 state->node = NULL;
817         }
818
819         talloc_set_destructor(state, daemon_control_destructor);
820
821         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
822                 talloc_steal(tmp_ctx, state);
823         }
824         
825         data.dptr = &c->data[0];
826         data.dsize = c->datalen;
827         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
828                                        c->srvid, c->opcode, client->client_id,
829                                        c->flags,
830                                        data, daemon_control_callback,
831                                        state);
832         if (res != 0) {
833                 DEBUG(0,(__location__ " Failed to send control to remote node %u\n",
834                          c->hdr.destnode));
835         }
836
837         talloc_free(tmp_ctx);
838 }
839
840 /*
841   register a call function
842 */
843 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
844                          ctdb_fn_t fn, int id)
845 {
846         struct ctdb_registered_call *call;
847         struct ctdb_db_context *ctdb_db;
848
849         ctdb_db = find_ctdb_db(ctdb, db_id);
850         if (ctdb_db == NULL) {
851                 return -1;
852         }
853
854         call = talloc(ctdb_db, struct ctdb_registered_call);
855         call->fn = fn;
856         call->id = id;
857
858         DLIST_ADD(ctdb_db->calls, call);        
859         return 0;
860 }
861
862
863
864 /*
865   this local messaging handler is ugly, but is needed to prevent
866   recursion in ctdb_send_message() when the destination node is the
867   same as the source node
868  */
869 struct ctdb_local_message {
870         struct ctdb_context *ctdb;
871         uint64_t srvid;
872         TDB_DATA data;
873 };
874
875 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
876                                        struct timeval t, void *private_data)
877 {
878         struct ctdb_local_message *m = talloc_get_type(private_data, 
879                                                        struct ctdb_local_message);
880         int res;
881
882         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
883         if (res != 0) {
884                 DEBUG(0, (__location__ " Failed to dispatch message for srvid=%llu\n", 
885                           (unsigned long long)m->srvid));
886         }
887         talloc_free(m);
888 }
889
890 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
891 {
892         struct ctdb_local_message *m;
893         m = talloc(ctdb, struct ctdb_local_message);
894         CTDB_NO_MEMORY(ctdb, m);
895
896         m->ctdb = ctdb;
897         m->srvid = srvid;
898         m->data  = data;
899         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
900         if (m->data.dptr == NULL) {
901                 talloc_free(m);
902                 return -1;
903         }
904
905         /* this needs to be done as an event to prevent recursion */
906         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
907         return 0;
908 }
909
910 /*
911   send a ctdb message
912 */
913 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
914                              uint64_t srvid, TDB_DATA data)
915 {
916         struct ctdb_req_message *r;
917         int len;
918
919         /* see if this is a message to ourselves */
920         if (pnn == ctdb->pnn) {
921                 return ctdb_local_message(ctdb, srvid, data);
922         }
923
924         len = offsetof(struct ctdb_req_message, data) + data.dsize;
925         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
926                                     struct ctdb_req_message);
927         CTDB_NO_MEMORY(ctdb, r);
928
929         r->hdr.destnode  = pnn;
930         r->srvid         = srvid;
931         r->datalen       = data.dsize;
932         memcpy(&r->data[0], data.dptr, data.dsize);
933
934         ctdb_queue_packet(ctdb, &r->hdr);
935
936         talloc_free(r);
937         return 0;
938 }
939