scripts: Add support for CTDB_DBDIR in tmpfs
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50
51                 /* Wait a second to allow pending log messages to be flushed */
52                 sleep(1);
53         }
54 }
55
56
57
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
59                                   struct timeval t, void *private_data)
60 {
61         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
62
63         if (getpid() != ctdb->ctdbd_pid) {
64                 return;
65         }
66
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72 /* Used to trigger a dummy event once per second, to make
73  * detection of hangs more reliable.
74  */
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
76 {
77         event_add_timed(ctdb->ev, ctdb, 
78                         timeval_current_ofs(1, 0), 
79                         ctdb_time_tick, ctdb);
80 }
81
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
83 {
84         /* start monitoring for connected/disconnected nodes */
85         ctdb_start_keepalive(ctdb);
86
87         /* start periodic update of tcp tickle lists */
88         ctdb_start_tcp_tickle_update(ctdb);
89
90         /* start listening for recovery daemon pings */
91         ctdb_control_recd_ping(ctdb);
92
93         /* start listening to timer ticks */
94         ctdb_start_time_tickd(ctdb);
95 }
96
97 static void ignore_signal(int signum)
98 {
99         struct sigaction act;
100
101         memset(&act, 0, sizeof(act));
102
103         act.sa_handler = SIG_IGN;
104         sigemptyset(&act.sa_mask);
105         sigaddset(&act.sa_mask, signum);
106         sigaction(signum, &act, NULL);
107 }
108
109
110 /*
111   send a packet to a client
112  */
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
114 {
115         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116         if (hdr->operation == CTDB_REQ_MESSAGE) {
117                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119                         talloc_free(client);
120                         return -1;
121                 }
122         }
123         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
124 }
125
126 /*
127   message handler for when we are in daemon mode. This redirects the message
128   to the right client
129  */
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
131                                     TDB_DATA data, void *private_data)
132 {
133         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134         struct ctdb_req_message *r;
135         int len;
136
137         /* construct a message to send to the client containing the data */
138         len = offsetof(struct ctdb_req_message, data) + data.dsize;
139         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
140                                len, struct ctdb_req_message);
141         CTDB_NO_MEMORY_VOID(ctdb, r);
142
143         talloc_set_name_const(r, "req_message packet");
144
145         r->srvid         = srvid;
146         r->datalen       = data.dsize;
147         memcpy(&r->data[0], data.dptr, data.dsize);
148
149         daemon_queue_send(client, &r->hdr);
150
151         talloc_free(r);
152 }
153
154 /*
155   this is called when the ctdb daemon received a ctdb request to 
156   set the srvid from the client
157  */
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
159 {
160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161         int res;
162         if (client == NULL) {
163                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164                 return -1;
165         }
166         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167         if (res != 0) {
168                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
169                          (unsigned long long)srvid));
170         } else {
171                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
172                          (unsigned long long)srvid));
173         }
174
175         return res;
176 }
177
178 /*
179   this is called when the ctdb daemon received a ctdb request to 
180   remove a srvid from the client
181  */
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
183 {
184         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185         if (client == NULL) {
186                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187                 return -1;
188         }
189         return ctdb_deregister_message_handler(ctdb, srvid, client);
190 }
191
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193                         TDB_DATA *outdata)
194 {
195         uint64_t *ids;
196         int i, num_ids;
197         uint8_t *results;
198
199         if ((indata.dsize % sizeof(uint64_t)) != 0) {
200                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201                                   "size=%d\n", (int)indata.dsize));
202                 return -1;
203         }
204
205         ids = (uint64_t *)indata.dptr;
206         num_ids = indata.dsize / 8;
207
208         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209         if (results == NULL) {
210                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211                 return -1;
212         }
213         for (i=0; i<num_ids; i++) {
214                 if (ctdb_check_message_handler(ctdb, ids[i])) {
215                         results[i/8] |= (1 << (i%8));
216                 }
217         }
218         outdata->dptr = (uint8_t *)results;
219         outdata->dsize = talloc_get_size(results);
220         return 0;
221 }
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         struct ctdb_db_context *ctdb_db;
229
230         ctdb_takeover_client_destructor_hook(client);
231         ctdb_reqid_remove(client->ctdb, client->client_id);
232         client->ctdb->num_clients--;
233
234         if (client->num_persistent_updates != 0) {
235                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
237         }
238         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239         if (ctdb_db) {
240                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241                                   "commit active. Forcing recovery.\n"));
242                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
243
244                 /*
245                  * trans3 transaction state:
246                  *
247                  * The destructor sets the pointer to NULL.
248                  */
249                 talloc_free(ctdb_db->persistent_state);
250         }
251
252         return 0;
253 }
254
255
256 /*
257   this is called when the ctdb daemon received a ctdb request message
258   from a local client over the unix domain socket
259  */
260 static void daemon_request_message_from_client(struct ctdb_client *client, 
261                                                struct ctdb_req_message *c)
262 {
263         TDB_DATA data;
264         int res;
265
266         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
268         }
269
270         /* maybe the message is for another client on this node */
271         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273                 return;
274         }
275
276         /* its for a remote node */
277         data.dptr = &c->data[0];
278         data.dsize = c->datalen;
279         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280                                        c->srvid, data);
281         if (res != 0) {
282                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283                          c->hdr.destnode));
284         }
285 }
286
287
288 struct daemon_call_state {
289         struct ctdb_client *client;
290         uint32_t reqid;
291         struct ctdb_call *call;
292         struct timeval start_time;
293
294         /* readonly request ? */
295         uint32_t readonly_fetch;
296         uint32_t client_callid;
297 };
298
299 /* 
300    complete a call from a client 
301 */
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
303 {
304         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
305                                                            struct daemon_call_state);
306         struct ctdb_reply_call *r;
307         int res;
308         uint32_t length;
309         struct ctdb_client *client = dstate->client;
310         struct ctdb_db_context *ctdb_db = state->ctdb_db;
311
312         talloc_steal(client, dstate);
313         talloc_steal(dstate, dstate->call);
314
315         res = ctdb_daemon_call_recv(state, dstate->call);
316         if (res != 0) {
317                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
319
320                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321                 return;
322         }
323
324         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325         /* If the client asked for readonly FETCH, we remapped this to 
326            FETCH_WITH_HEADER when calling the daemon. So we must
327            strip the extra header off the reply data before passing
328            it back to the client.
329         */
330         if (dstate->readonly_fetch
331         && dstate->client_callid == CTDB_FETCH_FUNC) {
332                 length -= sizeof(struct ctdb_ltdb_header);
333         }
334
335         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
336                                length, struct ctdb_reply_call);
337         if (r == NULL) {
338                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341                 return;
342         }
343         r->hdr.reqid        = dstate->reqid;
344         r->status           = dstate->call->status;
345
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 /* client only asked for a FETCH so we must strip off
349                    the extra ctdb_ltdb header
350                 */
351                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353         } else {
354                 r->datalen          = dstate->call->reply_data.dsize;
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
356         }
357
358         res = daemon_queue_send(client, &r->hdr);
359         if (res == -1) {
360                 /* client is dead - return immediately */
361                 return;
362         }
363         if (res != 0) {
364                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
365         }
366         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368         talloc_free(dstate);
369 }
370
371 struct ctdb_daemon_packet_wrap {
372         struct ctdb_context *ctdb;
373         uint32_t client_id;
374 };
375
376 /*
377   a wrapper to catch disconnected clients
378  */
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
380 {
381         struct ctdb_client *client;
382         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
383                                                             struct ctdb_daemon_packet_wrap);
384         if (w == NULL) {
385                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386                 return;
387         }
388
389         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390         if (client == NULL) {
391                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392                          w->client_id));
393                 talloc_free(w);
394                 return;
395         }
396         talloc_free(w);
397
398         /* process it */
399         daemon_incoming_packet(client, hdr);    
400 }
401
402 struct ctdb_deferred_fetch_call {
403         struct ctdb_deferred_fetch_call *next, *prev;
404         struct ctdb_req_call *c;
405         struct ctdb_daemon_packet_wrap *w;
406 };
407
408 struct ctdb_deferred_fetch_queue {
409         struct ctdb_deferred_fetch_call *deferred_calls;
410 };
411
412 struct ctdb_deferred_requeue {
413         struct ctdb_deferred_fetch_call *dfc;
414         struct ctdb_client *client;
415 };
416
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
419                                        struct timeval t, void *private_data)
420 {
421         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422         struct ctdb_client *client = dfr->client;
423
424         talloc_steal(client, dfr->dfc->c);
425         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426         talloc_free(dfr);
427 }
428
429 /* the referral context is destroyed either after a timeout or when the initial
430    fetch-lock has finished.
431    at this stage, immediately start reprocessing the queued up deferred
432    calls so they get reprocessed immediately (and since we are dmaster at
433    this stage, trigger the waiting smbd processes to pick up and aquire the
434    record right away.
435 */
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
437 {
438
439         /* need to reprocess the packets from the queue explicitely instead of
440            just using a normal destructor since we want, need, to
441            call the clients in the same oder as the requests queued up
442         */
443         while (dfq->deferred_calls != NULL) {
444                 struct ctdb_client *client;
445                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446                 struct ctdb_deferred_requeue *dfr;
447
448                 DLIST_REMOVE(dfq->deferred_calls, dfc);
449
450                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451                 if (client == NULL) {
452                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453                                  dfc->w->client_id));
454                         continue;
455                 }
456
457                 /* process it by pushing it back onto the eventloop */
458                 dfr = talloc(client, struct ctdb_deferred_requeue);
459                 if (dfr == NULL) {
460                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461                         continue;
462                 }
463
464                 dfr->dfc    = talloc_steal(dfr, dfc);
465                 dfr->client = client;
466
467                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
468         }
469
470         return 0;
471 }
472
473 /* insert the new deferral context into the rb tree.
474    there should never be a pre-existing context here, but check for it
475    warn and destroy the previous context if there is already a deferral context
476    for this key.
477 */
478 static void *insert_dfq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482                 talloc_free(data);
483         }
484         return parm;
485 }
486
487 /* if the original fetch-lock did not complete within a reasonable time,
488    free the context and context for all deferred requests to cause them to be
489    re-inserted into the event system.
490 */
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
492                                   struct timeval t, void *private_data)
493 {
494         talloc_free(private_data);
495 }
496
497 /* This function is used in the local daemon to register a KEY in a database
498    for being "fetched"
499    While the remote fetch is in-flight, any futher attempts to re-fetch the
500    same record will be deferred until the fetch completes.
501 */
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
503 {
504         uint32_t *k;
505         struct ctdb_deferred_fetch_queue *dfq;
506
507         k = ctdb_key_to_idkey(call, call->key);
508         if (k == NULL) {
509                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510                 return -1;
511         }
512
513         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
514         if (dfq == NULL) {
515                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
516                 talloc_free(k);
517                 return -1;
518         }
519         dfq->deferred_calls = NULL;
520
521         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
522
523         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
524
525         /* if the fetch havent completed in 30 seconds, just tear it all down
526            and let it try again as the events are reissued */
527         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
528
529         talloc_free(k);
530         return 0;
531 }
532
533 /* check if this is a duplicate request to a fetch already in-flight
534    if it is, make this call deferred to be reprocessed later when
535    the in-flight fetch completes.
536 */
537 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
538 {
539         uint32_t *k;
540         struct ctdb_deferred_fetch_queue *dfq;
541         struct ctdb_deferred_fetch_call *dfc;
542
543         k = ctdb_key_to_idkey(c, key);
544         if (k == NULL) {
545                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
546                 return -1;
547         }
548
549         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
550         if (dfq == NULL) {
551                 talloc_free(k);
552                 return -1;
553         }
554
555
556         talloc_free(k);
557
558         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
559         if (dfc == NULL) {
560                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
561                 return -1;
562         }
563
564         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
565         if (dfc->w == NULL) {
566                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
567                 talloc_free(dfc);
568                 return -1;
569         }
570
571         dfc->c = talloc_steal(dfc, c);
572         dfc->w->ctdb = ctdb_db->ctdb;
573         dfc->w->client_id = client->client_id;
574
575         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
576
577         return 0;
578 }
579
580
581 /*
582   this is called when the ctdb daemon received a ctdb request call
583   from a local client over the unix domain socket
584  */
585 static void daemon_request_call_from_client(struct ctdb_client *client, 
586                                             struct ctdb_req_call *c)
587 {
588         struct ctdb_call_state *state;
589         struct ctdb_db_context *ctdb_db;
590         struct daemon_call_state *dstate;
591         struct ctdb_call *call;
592         struct ctdb_ltdb_header header;
593         TDB_DATA key, data;
594         int ret;
595         struct ctdb_context *ctdb = client->ctdb;
596         struct ctdb_daemon_packet_wrap *w;
597
598         CTDB_INCREMENT_STAT(ctdb, total_calls);
599         CTDB_INCREMENT_STAT(ctdb, pending_calls);
600
601         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
602         if (!ctdb_db) {
603                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
604                           c->db_id));
605                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
606                 return;
607         }
608
609         if (ctdb_db->unhealthy_reason) {
610                 /*
611                  * this is just a warning, as the tdb should be empty anyway,
612                  * and only persistent databases can be unhealthy, which doesn't
613                  * use this code patch
614                  */
615                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
616                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
617         }
618
619         key.dptr = c->data;
620         key.dsize = c->keylen;
621
622         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
623         CTDB_NO_MEMORY_VOID(ctdb, w);   
624
625         w->ctdb = ctdb;
626         w->client_id = client->client_id;
627
628         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
629                                            (struct ctdb_req_header *)c, &data,
630                                            daemon_incoming_packet_wrap, w, true);
631         if (ret == -2) {
632                 /* will retry later */
633                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
634                 return;
635         }
636
637         talloc_free(w);
638
639         if (ret != 0) {
640                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
641                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
642                 return;
643         }
644
645
646         /* check if this fetch request is a duplicate for a
647            request we already have in flight. If so defer it until
648            the first request completes.
649         */
650         if (ctdb->tunable.fetch_collapse == 1) {
651                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
652                         ret = ctdb_ltdb_unlock(ctdb_db, key);
653                         if (ret != 0) {
654                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
655                         }
656                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
657                         return;
658                 }
659         }
660
661         /* Dont do READONLY if we dont have a tracking database */
662         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
663                 c->flags &= ~CTDB_WANT_READONLY;
664         }
665
666         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
667                 header.flags &= ~CTDB_REC_RO_FLAGS;
668                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
669                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
670                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
671                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
672                 }
673                 /* and clear out the tracking data */
674                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
675                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
676                 }
677         }
678
679         /* if we are revoking, we must defer all other calls until the revoke
680          * had completed.
681          */
682         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
683                 talloc_free(data.dptr);
684                 ret = ctdb_ltdb_unlock(ctdb_db, key);
685
686                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
687                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
688                 }
689                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
690                 return;
691         }
692
693         if ((header.dmaster == ctdb->pnn)
694         && (!(c->flags & CTDB_WANT_READONLY))
695         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
696                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
697                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
698                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
699                 }
700                 ret = ctdb_ltdb_unlock(ctdb_db, key);
701
702                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
703                         ctdb_fatal(ctdb, "Failed to start record revoke");
704                 }
705                 talloc_free(data.dptr);
706
707                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
708                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
709                 }
710
711                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
712                 return;
713         }               
714
715         dstate = talloc(client, struct daemon_call_state);
716         if (dstate == NULL) {
717                 ret = ctdb_ltdb_unlock(ctdb_db, key);
718                 if (ret != 0) {
719                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
720                 }
721
722                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
723                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
724                 return;
725         }
726         dstate->start_time = timeval_current();
727         dstate->client = client;
728         dstate->reqid  = c->hdr.reqid;
729         talloc_steal(dstate, data.dptr);
730
731         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
732         if (call == NULL) {
733                 ret = ctdb_ltdb_unlock(ctdb_db, key);
734                 if (ret != 0) {
735                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
736                 }
737
738                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
739                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
740                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
741                 return;
742         }
743
744         dstate->readonly_fetch = 0;
745         call->call_id = c->callid;
746         call->key = key;
747         call->call_data.dptr = c->data + c->keylen;
748         call->call_data.dsize = c->calldatalen;
749         call->flags = c->flags;
750
751         if (c->flags & CTDB_WANT_READONLY) {
752                 /* client wants readonly record, so translate this into a 
753                    fetch with header. remember what the client asked for
754                    so we can remap the reply back to the proper format for
755                    the client in the reply
756                  */
757                 dstate->client_callid = call->call_id;
758                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
759                 dstate->readonly_fetch = 1;
760         }
761
762         if (header.dmaster == ctdb->pnn) {
763                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
764         } else {
765                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
766                 if (ctdb->tunable.fetch_collapse == 1) {
767                         /* This request triggered a remote fetch-lock.
768                            set up a deferral for this key so any additional
769                            fetch-locks are deferred until the current one
770                            finishes.
771                          */
772                         setup_deferred_fetch_locks(ctdb_db, call);
773                 }
774         }
775
776         ret = ctdb_ltdb_unlock(ctdb_db, key);
777         if (ret != 0) {
778                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
779         }
780
781         if (state == NULL) {
782                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
783                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
784                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
785                 return;
786         }
787         talloc_steal(state, dstate);
788         talloc_steal(client, state);
789
790         state->async.fn = daemon_call_from_client_callback;
791         state->async.private_data = dstate;
792 }
793
794
795 static void daemon_request_control_from_client(struct ctdb_client *client, 
796                                                struct ctdb_req_control *c);
797
798 /* data contains a packet from the client */
799 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
800 {
801         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
802         TALLOC_CTX *tmp_ctx;
803         struct ctdb_context *ctdb = client->ctdb;
804
805         /* place the packet as a child of a tmp_ctx. We then use
806            talloc_free() below to free it. If any of the calls want
807            to keep it, then they will steal it somewhere else, and the
808            talloc_free() will be a no-op */
809         tmp_ctx = talloc_new(client);
810         talloc_steal(tmp_ctx, hdr);
811
812         if (hdr->ctdb_magic != CTDB_MAGIC) {
813                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
814                 goto done;
815         }
816
817         if (hdr->ctdb_version != CTDB_VERSION) {
818                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
819                 goto done;
820         }
821
822         switch (hdr->operation) {
823         case CTDB_REQ_CALL:
824                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
825                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
826                 break;
827
828         case CTDB_REQ_MESSAGE:
829                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
830                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
831                 break;
832
833         case CTDB_REQ_CONTROL:
834                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
835                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
836                 break;
837
838         default:
839                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
840                          hdr->operation));
841         }
842
843 done:
844         talloc_free(tmp_ctx);
845 }
846
847 /*
848   called when the daemon gets a incoming packet
849  */
850 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
851 {
852         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
853         struct ctdb_req_header *hdr;
854
855         if (cnt == 0) {
856                 talloc_free(client);
857                 return;
858         }
859
860         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
861
862         if (cnt < sizeof(*hdr)) {
863                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
864                                (unsigned)cnt);
865                 return;
866         }
867         hdr = (struct ctdb_req_header *)data;
868         if (cnt != hdr->length) {
869                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
870                                (unsigned)hdr->length, (unsigned)cnt);
871                 return;
872         }
873
874         if (hdr->ctdb_magic != CTDB_MAGIC) {
875                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
876                 return;
877         }
878
879         if (hdr->ctdb_version != CTDB_VERSION) {
880                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
881                 return;
882         }
883
884         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
885                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
886                  hdr->srcnode, hdr->destnode));
887
888         /* it is the responsibility of the incoming packet function to free 'data' */
889         daemon_incoming_packet(client, hdr);
890 }
891
892
893 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
894 {
895         if (client_pid->ctdb->client_pids != NULL) {
896                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
897         }
898
899         return 0;
900 }
901
902
903 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
904                          uint16_t flags, void *private_data)
905 {
906         struct sockaddr_un addr;
907         socklen_t len;
908         int fd;
909         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
910         struct ctdb_client *client;
911         struct ctdb_client_pid_list *client_pid;
912         pid_t peer_pid = 0;
913
914         memset(&addr, 0, sizeof(addr));
915         len = sizeof(addr);
916         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
917         if (fd == -1) {
918                 return;
919         }
920
921         set_nonblocking(fd);
922         set_close_on_exec(fd);
923
924         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
925
926         client = talloc_zero(ctdb, struct ctdb_client);
927         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
928                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
929         }
930
931         client->ctdb = ctdb;
932         client->fd = fd;
933         client->client_id = ctdb_reqid_new(ctdb, client);
934         client->pid = peer_pid;
935
936         client_pid = talloc(client, struct ctdb_client_pid_list);
937         if (client_pid == NULL) {
938                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
939                 close(fd);
940                 talloc_free(client);
941                 return;
942         }               
943         client_pid->ctdb   = ctdb;
944         client_pid->pid    = peer_pid;
945         client_pid->client = client;
946
947         DLIST_ADD(ctdb->client_pids, client_pid);
948
949         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
950                                          ctdb_daemon_read_cb, client,
951                                          "client-%u", client->pid);
952
953         talloc_set_destructor(client, ctdb_client_destructor);
954         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
955         ctdb->num_clients++;
956 }
957
958
959
960 /*
961   create a unix domain socket and bind it
962   return a file descriptor open on the socket 
963 */
964 static int ux_socket_bind(struct ctdb_context *ctdb)
965 {
966         struct sockaddr_un addr;
967
968         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
969         if (ctdb->daemon.sd == -1) {
970                 return -1;
971         }
972
973         memset(&addr, 0, sizeof(addr));
974         addr.sun_family = AF_UNIX;
975         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
976
977         /* First check if an old ctdbd might be running */
978         if (connect(ctdb->daemon.sd,
979                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
980                 DEBUG(DEBUG_CRIT,
981                       ("Something is already listening on ctdb socket '%s'\n",
982                        ctdb->daemon.name));
983                 goto failed;
984         }
985
986         /* Remove any old socket */
987         unlink(ctdb->daemon.name);
988
989         set_close_on_exec(ctdb->daemon.sd);
990         set_nonblocking(ctdb->daemon.sd);
991
992         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
993                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
994                 goto failed;
995         }
996
997         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
998             chmod(ctdb->daemon.name, 0700) != 0) {
999                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1000                 goto failed;
1001         }
1002
1003
1004         if (listen(ctdb->daemon.sd, 100) != 0) {
1005                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1006                 goto failed;
1007         }
1008
1009         return 0;
1010
1011 failed:
1012         close(ctdb->daemon.sd);
1013         ctdb->daemon.sd = -1;
1014         return -1;      
1015 }
1016
1017 static void initialise_node_flags (struct ctdb_context *ctdb)
1018 {
1019         if (ctdb->pnn == -1) {
1020                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1021         }
1022
1023         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1024
1025         /* do we start out in DISABLED mode? */
1026         if (ctdb->start_as_disabled != 0) {
1027                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1028                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1029         }
1030         /* do we start out in STOPPED mode? */
1031         if (ctdb->start_as_stopped != 0) {
1032                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1033                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1034         }
1035 }
1036
1037 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1038                                       void *private_data)
1039 {
1040         if (status != 0) {
1041                 ctdb_die(ctdb, "Failed to run setup event");
1042         }
1043         ctdb_run_notification_script(ctdb, "setup");
1044
1045         /* tell all other nodes we've just started up */
1046         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1047                                  0, CTDB_CONTROL_STARTUP, 0,
1048                                  CTDB_CTRL_FLAG_NOREPLY,
1049                                  tdb_null, NULL, NULL);
1050
1051         /* Start the recovery daemon */
1052         if (ctdb_start_recoverd(ctdb) != 0) {
1053                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1054                 exit(11);
1055         }
1056
1057         ctdb_start_periodic_events(ctdb);
1058
1059         ctdb_wait_for_first_recovery(ctdb);
1060 }
1061
1062 static struct timeval tevent_before_wait_ts;
1063 static struct timeval tevent_after_wait_ts;
1064
1065 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1066                               void *private_data)
1067 {
1068         struct timeval diff;
1069         struct timeval now;
1070         struct ctdb_context *ctdb =
1071                 talloc_get_type(private_data, struct ctdb_context);
1072
1073         if (getpid() != ctdb->ctdbd_pid) {
1074                 return;
1075         }
1076
1077         now = timeval_current();
1078
1079         switch (tp) {
1080         case TEVENT_TRACE_BEFORE_WAIT:
1081                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1082                         diff = timeval_until(&tevent_after_wait_ts, &now);
1083                         if (diff.tv_sec > 3) {
1084                                 DEBUG(DEBUG_ERR,
1085                                       ("Handling event took %ld seconds!\n",
1086                                        diff.tv_sec));
1087                         }
1088                 }
1089                 tevent_before_wait_ts = now;
1090                 break;
1091
1092         case TEVENT_TRACE_AFTER_WAIT:
1093                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1094                         diff = timeval_until(&tevent_before_wait_ts, &now);
1095                         if (diff.tv_sec > 3) {
1096                                 DEBUG(DEBUG_CRIT,
1097                                       ("No event for %ld seconds!\n",
1098                                        diff.tv_sec));
1099                         }
1100                 }
1101                 tevent_after_wait_ts = now;
1102                 break;
1103
1104         default:
1105                 /* Do nothing for future tevent trace points */ ;
1106         }
1107 }
1108
1109 static void ctdb_remove_pidfile(void)
1110 {
1111         /* Only the main ctdbd's PID matches the SID */
1112         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1113                 if (unlink(ctdbd_pidfile) == 0) {
1114                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1115                                              ctdbd_pidfile));
1116                 } else {
1117                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1118                                               ctdbd_pidfile));
1119                 }
1120         }
1121 }
1122
1123 static void ctdb_create_pidfile(pid_t pid)
1124 {
1125         if (ctdbd_pidfile != NULL) {
1126                 FILE *fp;
1127
1128                 fp = fopen(ctdbd_pidfile, "w");
1129                 if (fp == NULL) {
1130                         DEBUG(DEBUG_ALERT,
1131                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1132                         exit(11);
1133                 }
1134
1135                 fprintf(fp, "%d\n", pid);
1136                 fclose(fp);
1137                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1138                 atexit(ctdb_remove_pidfile);
1139         }
1140 }
1141
1142 /*
1143   start the protocol going as a daemon
1144 */
1145 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
1146 {
1147         int res, ret = -1;
1148         struct fd_event *fde;
1149         const char *domain_socket_name;
1150
1151         /* create a unix domain stream socket to listen to */
1152         res = ux_socket_bind(ctdb);
1153         if (res!=0) {
1154                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1155                 exit(10);
1156         }
1157
1158         if (do_fork && fork()) {
1159                 return 0;
1160         }
1161
1162         tdb_reopen_all(false);
1163
1164         if (do_fork) {
1165                 if (setsid() == -1) {
1166                         ctdb_die(ctdb, "Failed to setsid()\n");
1167                 }
1168                 close(0);
1169                 if (open("/dev/null", O_RDONLY) != 0) {
1170                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1171                         exit(11);
1172                 }
1173         }
1174         ignore_signal(SIGPIPE);
1175         ignore_signal(SIGUSR1);
1176
1177         ctdb->ctdbd_pid = getpid();
1178         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1179                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1180         ctdb_create_pidfile(ctdb->ctdbd_pid);
1181
1182         /* Make sure we log something when the daemon terminates.
1183          * This must be the first exit handler to run (so the last to
1184          * be registered.
1185          */
1186         atexit(print_exit_message);
1187
1188         if (ctdb->do_setsched) {
1189                 /* try to set us up as realtime */
1190                 if (!set_scheduler()) {
1191                         exit(1);
1192                 }
1193                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1194         }
1195
1196         /* ensure the socket is deleted on exit of the daemon */
1197         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1198         if (domain_socket_name == NULL) {
1199                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1200                 exit(12);
1201         }
1202
1203         ctdb->ev = event_context_init(NULL);
1204         tevent_loop_allow_nesting(ctdb->ev);
1205         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1206         ret = ctdb_init_tevent_logging(ctdb);
1207         if (ret != 0) {
1208                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1209                 exit(1);
1210         }
1211
1212         /* set up a handler to pick up sigchld */
1213         if (ctdb_init_sigchld(ctdb) == NULL) {
1214                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1215                 exit(1);
1216         }
1217
1218         ctdb_set_child_logging(ctdb);
1219         if (use_syslog) {
1220                 if (start_syslog_daemon(ctdb)) {
1221                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1222                         exit(10);
1223                 }
1224         }
1225
1226         /* initialize statistics collection */
1227         ctdb_statistics_init(ctdb);
1228
1229         /* force initial recovery for election */
1230         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1231
1232         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1233         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1234         if (ret != 0) {
1235                 ctdb_die(ctdb, "Failed to run init event\n");
1236         }
1237         ctdb_run_notification_script(ctdb, "init");
1238
1239         if (strcmp(ctdb->transport, "tcp") == 0) {
1240                 ret = ctdb_tcp_init(ctdb);
1241         }
1242 #ifdef USE_INFINIBAND
1243         if (strcmp(ctdb->transport, "ib") == 0) {
1244                 ret = ctdb_ibw_init(ctdb);
1245         }
1246 #endif
1247         if (ret != 0) {
1248                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1249                 return -1;
1250         }
1251
1252         if (ctdb->methods == NULL) {
1253                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1254                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1255         }
1256
1257         /* initialise the transport  */
1258         if (ctdb->methods->initialise(ctdb) != 0) {
1259                 ctdb_fatal(ctdb, "transport failed to initialise");
1260         }
1261
1262         initialise_node_flags(ctdb);
1263
1264         if (ctdb->public_addresses_file) {
1265                 ret = ctdb_set_public_addresses(ctdb, true);
1266                 if (ret == -1) {
1267                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1268                         exit(1);
1269                 }
1270                 if (ctdb->do_checkpublicip) {
1271                         ctdb_start_monitoring_interfaces(ctdb);
1272                 }
1273         }
1274
1275
1276         /* attach to existing databases */
1277         if (ctdb_attach_databases(ctdb) != 0) {
1278                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1279         }
1280
1281         /* start frozen, then let the first election sort things out */
1282         if (!ctdb_blocking_freeze(ctdb)) {
1283                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1284         }
1285
1286         /* now start accepting clients, only can do this once frozen */
1287         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1288                            EVENT_FD_READ,
1289                            ctdb_accept_client, ctdb);
1290         if (fde == NULL) {
1291                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1292         }
1293         tevent_fd_set_auto_close(fde);
1294
1295         /* release any IPs we hold from previous runs of the daemon */
1296         if (ctdb->tunable.disable_ip_failover == 0) {
1297                 ctdb_release_all_ips(ctdb);
1298         }
1299
1300         /* Start the transport */
1301         if (ctdb->methods->start(ctdb) != 0) {
1302                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1303                 ctdb_fatal(ctdb, "transport failed to start");
1304         }
1305
1306         /* Recovery daemon and timed events are started from the
1307          * callback, only after the setup event completes
1308          * successfully.
1309          */
1310         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1311         ret = ctdb_event_script_callback(ctdb,
1312                                          ctdb,
1313                                          ctdb_setup_event_callback,
1314                                          ctdb,
1315                                          CTDB_EVENT_SETUP,
1316                                          "%s",
1317                                          "");
1318         if (ret != 0) {
1319                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1320                 exit(1);
1321         }
1322
1323         lockdown_memory(ctdb->valgrinding);
1324
1325         /* go into a wait loop to allow other nodes to complete */
1326         event_loop_wait(ctdb->ev);
1327
1328         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1329         exit(1);
1330 }
1331
1332 /*
1333   allocate a packet for use in daemon<->daemon communication
1334  */
1335 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1336                                                  TALLOC_CTX *mem_ctx, 
1337                                                  enum ctdb_operation operation, 
1338                                                  size_t length, size_t slength,
1339                                                  const char *type)
1340 {
1341         int size;
1342         struct ctdb_req_header *hdr;
1343
1344         length = MAX(length, slength);
1345         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1346
1347         if (ctdb->methods == NULL) {
1348                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1349                          operation, (unsigned)length));
1350                 return NULL;
1351         }
1352
1353         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1354         if (hdr == NULL) {
1355                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1356                          operation, (unsigned)length));
1357                 return NULL;
1358         }
1359         talloc_set_name_const(hdr, type);
1360         memset(hdr, 0, slength);
1361         hdr->length       = length;
1362         hdr->operation    = operation;
1363         hdr->ctdb_magic   = CTDB_MAGIC;
1364         hdr->ctdb_version = CTDB_VERSION;
1365         hdr->generation   = ctdb->vnn_map->generation;
1366         hdr->srcnode      = ctdb->pnn;
1367
1368         return hdr;     
1369 }
1370
1371 struct daemon_control_state {
1372         struct daemon_control_state *next, *prev;
1373         struct ctdb_client *client;
1374         struct ctdb_req_control *c;
1375         uint32_t reqid;
1376         struct ctdb_node *node;
1377 };
1378
1379 /*
1380   callback when a control reply comes in
1381  */
1382 static void daemon_control_callback(struct ctdb_context *ctdb,
1383                                     int32_t status, TDB_DATA data, 
1384                                     const char *errormsg,
1385                                     void *private_data)
1386 {
1387         struct daemon_control_state *state = talloc_get_type(private_data, 
1388                                                              struct daemon_control_state);
1389         struct ctdb_client *client = state->client;
1390         struct ctdb_reply_control *r;
1391         size_t len;
1392         int ret;
1393
1394         /* construct a message to send to the client containing the data */
1395         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1396         if (errormsg) {
1397                 len += strlen(errormsg);
1398         }
1399         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1400                                struct ctdb_reply_control);
1401         CTDB_NO_MEMORY_VOID(ctdb, r);
1402
1403         r->hdr.reqid     = state->reqid;
1404         r->status        = status;
1405         r->datalen       = data.dsize;
1406         r->errorlen = 0;
1407         memcpy(&r->data[0], data.dptr, data.dsize);
1408         if (errormsg) {
1409                 r->errorlen = strlen(errormsg);
1410                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1411         }
1412
1413         ret = daemon_queue_send(client, &r->hdr);
1414         if (ret != -1) {
1415                 talloc_free(state);
1416         }
1417 }
1418
1419 /*
1420   fail all pending controls to a disconnected node
1421  */
1422 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1423 {
1424         struct daemon_control_state *state;
1425         while ((state = node->pending_controls)) {
1426                 DLIST_REMOVE(node->pending_controls, state);
1427                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1428                                         "node is disconnected", state);
1429         }
1430 }
1431
1432 /*
1433   destroy a daemon_control_state
1434  */
1435 static int daemon_control_destructor(struct daemon_control_state *state)
1436 {
1437         if (state->node) {
1438                 DLIST_REMOVE(state->node->pending_controls, state);
1439         }
1440         return 0;
1441 }
1442
1443 /*
1444   this is called when the ctdb daemon received a ctdb request control
1445   from a local client over the unix domain socket
1446  */
1447 static void daemon_request_control_from_client(struct ctdb_client *client, 
1448                                                struct ctdb_req_control *c)
1449 {
1450         TDB_DATA data;
1451         int res;
1452         struct daemon_control_state *state;
1453         TALLOC_CTX *tmp_ctx = talloc_new(client);
1454
1455         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1456                 c->hdr.destnode = client->ctdb->pnn;
1457         }
1458
1459         state = talloc(client, struct daemon_control_state);
1460         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1461
1462         state->client = client;
1463         state->c = talloc_steal(state, c);
1464         state->reqid = c->hdr.reqid;
1465         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1466                 state->node = client->ctdb->nodes[c->hdr.destnode];
1467                 DLIST_ADD(state->node->pending_controls, state);
1468         } else {
1469                 state->node = NULL;
1470         }
1471
1472         talloc_set_destructor(state, daemon_control_destructor);
1473
1474         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1475                 talloc_steal(tmp_ctx, state);
1476         }
1477         
1478         data.dptr = &c->data[0];
1479         data.dsize = c->datalen;
1480         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1481                                        c->srvid, c->opcode, client->client_id,
1482                                        c->flags,
1483                                        data, daemon_control_callback,
1484                                        state);
1485         if (res != 0) {
1486                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1487                          c->hdr.destnode));
1488         }
1489
1490         talloc_free(tmp_ctx);
1491 }
1492
1493 /*
1494   register a call function
1495 */
1496 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1497                          ctdb_fn_t fn, int id)
1498 {
1499         struct ctdb_registered_call *call;
1500         struct ctdb_db_context *ctdb_db;
1501
1502         ctdb_db = find_ctdb_db(ctdb, db_id);
1503         if (ctdb_db == NULL) {
1504                 return -1;
1505         }
1506
1507         call = talloc(ctdb_db, struct ctdb_registered_call);
1508         call->fn = fn;
1509         call->id = id;
1510
1511         DLIST_ADD(ctdb_db->calls, call);        
1512         return 0;
1513 }
1514
1515
1516
1517 /*
1518   this local messaging handler is ugly, but is needed to prevent
1519   recursion in ctdb_send_message() when the destination node is the
1520   same as the source node
1521  */
1522 struct ctdb_local_message {
1523         struct ctdb_context *ctdb;
1524         uint64_t srvid;
1525         TDB_DATA data;
1526 };
1527
1528 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1529                                        struct timeval t, void *private_data)
1530 {
1531         struct ctdb_local_message *m = talloc_get_type(private_data, 
1532                                                        struct ctdb_local_message);
1533         int res;
1534
1535         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1536         if (res != 0) {
1537                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1538                           (unsigned long long)m->srvid));
1539         }
1540         talloc_free(m);
1541 }
1542
1543 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1544 {
1545         struct ctdb_local_message *m;
1546         m = talloc(ctdb, struct ctdb_local_message);
1547         CTDB_NO_MEMORY(ctdb, m);
1548
1549         m->ctdb = ctdb;
1550         m->srvid = srvid;
1551         m->data  = data;
1552         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1553         if (m->data.dptr == NULL) {
1554                 talloc_free(m);
1555                 return -1;
1556         }
1557
1558         /* this needs to be done as an event to prevent recursion */
1559         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1560         return 0;
1561 }
1562
1563 /*
1564   send a ctdb message
1565 */
1566 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1567                              uint64_t srvid, TDB_DATA data)
1568 {
1569         struct ctdb_req_message *r;
1570         int len;
1571
1572         if (ctdb->methods == NULL) {
1573                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1574                 return -1;
1575         }
1576
1577         /* see if this is a message to ourselves */
1578         if (pnn == ctdb->pnn) {
1579                 return ctdb_local_message(ctdb, srvid, data);
1580         }
1581
1582         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1583         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1584                                     struct ctdb_req_message);
1585         CTDB_NO_MEMORY(ctdb, r);
1586
1587         r->hdr.destnode  = pnn;
1588         r->srvid         = srvid;
1589         r->datalen       = data.dsize;
1590         memcpy(&r->data[0], data.dptr, data.dsize);
1591
1592         ctdb_queue_packet(ctdb, &r->hdr);
1593
1594         talloc_free(r);
1595         return 0;
1596 }
1597
1598
1599
1600 struct ctdb_client_notify_list {
1601         struct ctdb_client_notify_list *next, *prev;
1602         struct ctdb_context *ctdb;
1603         uint64_t srvid;
1604         TDB_DATA data;
1605 };
1606
1607
1608 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1609 {
1610         int ret;
1611
1612         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1613
1614         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1615         if (ret != 0) {
1616                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1617         }
1618
1619         return 0;
1620 }
1621
1622 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1623 {
1624         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1625         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1626         struct ctdb_client_notify_list *nl;
1627
1628         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1629
1630         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1631                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1632                 return -1;
1633         }
1634
1635         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1636                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1637                 return -1;
1638         }
1639
1640
1641         if (client == NULL) {
1642                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1643                 return -1;
1644         }
1645
1646         for(nl=client->notify; nl; nl=nl->next) {
1647                 if (nl->srvid == notify->srvid) {
1648                         break;
1649                 }
1650         }
1651         if (nl != NULL) {
1652                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1653                 return -1;
1654         }
1655
1656         nl = talloc(client, struct ctdb_client_notify_list);
1657         CTDB_NO_MEMORY(ctdb, nl);
1658         nl->ctdb       = ctdb;
1659         nl->srvid      = notify->srvid;
1660         nl->data.dsize = notify->len;
1661         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1662         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1663         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1664         
1665         DLIST_ADD(client->notify, nl);
1666         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1667
1668         return 0;
1669 }
1670
1671 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1672 {
1673         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1674         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1675         struct ctdb_client_notify_list *nl;
1676
1677         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1678
1679         if (client == NULL) {
1680                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1681                 return -1;
1682         }
1683
1684         for(nl=client->notify; nl; nl=nl->next) {
1685                 if (nl->srvid == notify->srvid) {
1686                         break;
1687                 }
1688         }
1689         if (nl == NULL) {
1690                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1691                 return -1;
1692         }
1693
1694         DLIST_REMOVE(client->notify, nl);
1695         talloc_set_destructor(nl, NULL);
1696         talloc_free(nl);
1697
1698         return 0;
1699 }
1700
1701 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1702 {
1703         struct ctdb_client_pid_list *client_pid;
1704
1705         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1706                 if (client_pid->pid == pid) {
1707                         return client_pid->client;
1708                 }
1709         }
1710         return NULL;
1711 }
1712
1713
1714 /* This control is used by samba when probing if a process (of a samba daemon)
1715    exists on the node.
1716    Samba does this when it needs/wants to check if a subrecord in one of the
1717    databases is still valied, or if it is stale and can be removed.
1718    If the node is in unhealthy or stopped state we just kill of the samba
1719    process holding htis sub-record and return to the calling samba that
1720    the process does not exist.
1721    This allows us to forcefully recall subrecords registered by samba processes
1722    on banned and stopped nodes.
1723 */
1724 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1725 {
1726         struct ctdb_client *client;
1727
1728         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1729                 client = ctdb_find_client_by_pid(ctdb, pid);
1730                 if (client != NULL) {
1731                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1732                         talloc_free(client);
1733                 }
1734                 return -1;
1735         }
1736
1737         return kill(pid, 0);
1738 }
1739
1740 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1741 {
1742         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1743                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1744                 return;
1745         }
1746
1747         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1748         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1749         ctdb_stop_recoverd(ctdb);
1750         ctdb_stop_keepalive(ctdb);
1751         ctdb_stop_monitoring(ctdb);
1752         ctdb_release_all_ips(ctdb);
1753         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1754         if (ctdb->methods != NULL) {
1755                 ctdb->methods->shutdown(ctdb);
1756         }
1757
1758         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1759         exit(exit_code);
1760 }