da2f42e9a97db44e9b740acb7fd027224987cfc7
[obnox/samba/samba-obnox.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50
51                 /* Wait a second to allow pending log messages to be flushed */
52                 sleep(1);
53         }
54 }
55
56
57
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
59                                   struct timeval t, void *private_data)
60 {
61         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
62
63         if (getpid() != ctdb->ctdbd_pid) {
64                 return;
65         }
66
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72 /* Used to trigger a dummy event once per second, to make
73  * detection of hangs more reliable.
74  */
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
76 {
77         event_add_timed(ctdb->ev, ctdb, 
78                         timeval_current_ofs(1, 0), 
79                         ctdb_time_tick, ctdb);
80 }
81
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
83 {
84         /* start monitoring for connected/disconnected nodes */
85         ctdb_start_keepalive(ctdb);
86
87         /* start periodic update of tcp tickle lists */
88         ctdb_start_tcp_tickle_update(ctdb);
89
90         /* start listening for recovery daemon pings */
91         ctdb_control_recd_ping(ctdb);
92
93         /* start listening to timer ticks */
94         ctdb_start_time_tickd(ctdb);
95 }
96
97 static void ignore_signal(int signum)
98 {
99         struct sigaction act;
100
101         memset(&act, 0, sizeof(act));
102
103         act.sa_handler = SIG_IGN;
104         sigemptyset(&act.sa_mask);
105         sigaddset(&act.sa_mask, signum);
106         sigaction(signum, &act, NULL);
107 }
108
109
110 /*
111   send a packet to a client
112  */
113 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
114 {
115         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
116         if (hdr->operation == CTDB_REQ_MESSAGE) {
117                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
118                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
119                         talloc_free(client);
120                         return -1;
121                 }
122         }
123         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
124 }
125
126 /*
127   message handler for when we are in daemon mode. This redirects the message
128   to the right client
129  */
130 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
131                                     TDB_DATA data, void *private_data)
132 {
133         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
134         struct ctdb_req_message *r;
135         int len;
136
137         /* construct a message to send to the client containing the data */
138         len = offsetof(struct ctdb_req_message, data) + data.dsize;
139         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
140                                len, struct ctdb_req_message);
141         CTDB_NO_MEMORY_VOID(ctdb, r);
142
143         talloc_set_name_const(r, "req_message packet");
144
145         r->srvid         = srvid;
146         r->datalen       = data.dsize;
147         memcpy(&r->data[0], data.dptr, data.dsize);
148
149         daemon_queue_send(client, &r->hdr);
150
151         talloc_free(r);
152 }
153
154 /*
155   this is called when the ctdb daemon received a ctdb request to 
156   set the srvid from the client
157  */
158 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
159 {
160         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
161         int res;
162         if (client == NULL) {
163                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
164                 return -1;
165         }
166         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
167         if (res != 0) {
168                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
169                          (unsigned long long)srvid));
170         } else {
171                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
172                          (unsigned long long)srvid));
173         }
174
175         return res;
176 }
177
178 /*
179   this is called when the ctdb daemon received a ctdb request to 
180   remove a srvid from the client
181  */
182 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
183 {
184         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
185         if (client == NULL) {
186                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
187                 return -1;
188         }
189         return ctdb_deregister_message_handler(ctdb, srvid, client);
190 }
191
192 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
193                         TDB_DATA *outdata)
194 {
195         uint64_t *ids;
196         int i, num_ids;
197         uint8_t *results;
198
199         if ((indata.dsize % sizeof(uint64_t)) != 0) {
200                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
201                                   "size=%d\n", (int)indata.dsize));
202                 return -1;
203         }
204
205         ids = (uint64_t *)indata.dptr;
206         num_ids = indata.dsize / 8;
207
208         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
209         if (results == NULL) {
210                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
211                 return -1;
212         }
213         for (i=0; i<num_ids; i++) {
214                 if (ctdb_check_message_handler(ctdb, ids[i])) {
215                         results[i/8] |= (1 << (i%8));
216                 }
217         }
218         outdata->dptr = (uint8_t *)results;
219         outdata->dsize = talloc_get_size(results);
220         return 0;
221 }
222
223 /*
224   destroy a ctdb_client
225 */
226 static int ctdb_client_destructor(struct ctdb_client *client)
227 {
228         struct ctdb_db_context *ctdb_db;
229
230         ctdb_takeover_client_destructor_hook(client);
231         ctdb_reqid_remove(client->ctdb, client->client_id);
232         client->ctdb->num_clients--;
233
234         if (client->num_persistent_updates != 0) {
235                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
236                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
237         }
238         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
239         if (ctdb_db) {
240                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
241                                   "commit active. Forcing recovery.\n"));
242                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
243
244                 /*
245                  * trans3 transaction state:
246                  *
247                  * The destructor sets the pointer to NULL.
248                  */
249                 talloc_free(ctdb_db->persistent_state);
250         }
251
252         return 0;
253 }
254
255
256 /*
257   this is called when the ctdb daemon received a ctdb request message
258   from a local client over the unix domain socket
259  */
260 static void daemon_request_message_from_client(struct ctdb_client *client, 
261                                                struct ctdb_req_message *c)
262 {
263         TDB_DATA data;
264         int res;
265
266         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
267                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
268         }
269
270         /* maybe the message is for another client on this node */
271         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
272                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
273                 return;
274         }
275
276         /* its for a remote node */
277         data.dptr = &c->data[0];
278         data.dsize = c->datalen;
279         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
280                                        c->srvid, data);
281         if (res != 0) {
282                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
283                          c->hdr.destnode));
284         }
285 }
286
287
288 struct daemon_call_state {
289         struct ctdb_client *client;
290         uint32_t reqid;
291         struct ctdb_call *call;
292         struct timeval start_time;
293
294         /* readonly request ? */
295         uint32_t readonly_fetch;
296         uint32_t client_callid;
297 };
298
299 /* 
300    complete a call from a client 
301 */
302 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
303 {
304         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
305                                                            struct daemon_call_state);
306         struct ctdb_reply_call *r;
307         int res;
308         uint32_t length;
309         struct ctdb_client *client = dstate->client;
310         struct ctdb_db_context *ctdb_db = state->ctdb_db;
311
312         talloc_steal(client, dstate);
313         talloc_steal(dstate, dstate->call);
314
315         res = ctdb_daemon_call_recv(state, dstate->call);
316         if (res != 0) {
317                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
318                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
319
320                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
321                 return;
322         }
323
324         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
325         /* If the client asked for readonly FETCH, we remapped this to 
326            FETCH_WITH_HEADER when calling the daemon. So we must
327            strip the extra header off the reply data before passing
328            it back to the client.
329         */
330         if (dstate->readonly_fetch
331         && dstate->client_callid == CTDB_FETCH_FUNC) {
332                 length -= sizeof(struct ctdb_ltdb_header);
333         }
334
335         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
336                                length, struct ctdb_reply_call);
337         if (r == NULL) {
338                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
339                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
340                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
341                 return;
342         }
343         r->hdr.reqid        = dstate->reqid;
344         r->status           = dstate->call->status;
345
346         if (dstate->readonly_fetch
347         && dstate->client_callid == CTDB_FETCH_FUNC) {
348                 /* client only asked for a FETCH so we must strip off
349                    the extra ctdb_ltdb header
350                 */
351                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
352                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
353         } else {
354                 r->datalen          = dstate->call->reply_data.dsize;
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
356         }
357
358         res = daemon_queue_send(client, &r->hdr);
359         if (res == -1) {
360                 /* client is dead - return immediately */
361                 return;
362         }
363         if (res != 0) {
364                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
365         }
366         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
367         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
368         talloc_free(dstate);
369 }
370
371 struct ctdb_daemon_packet_wrap {
372         struct ctdb_context *ctdb;
373         uint32_t client_id;
374 };
375
376 /*
377   a wrapper to catch disconnected clients
378  */
379 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
380 {
381         struct ctdb_client *client;
382         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
383                                                             struct ctdb_daemon_packet_wrap);
384         if (w == NULL) {
385                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
386                 return;
387         }
388
389         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
390         if (client == NULL) {
391                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
392                          w->client_id));
393                 talloc_free(w);
394                 return;
395         }
396         talloc_free(w);
397
398         /* process it */
399         daemon_incoming_packet(client, hdr);    
400 }
401
402 struct ctdb_deferred_fetch_call {
403         struct ctdb_deferred_fetch_call *next, *prev;
404         struct ctdb_req_call *c;
405         struct ctdb_daemon_packet_wrap *w;
406 };
407
408 struct ctdb_deferred_fetch_queue {
409         struct ctdb_deferred_fetch_call *deferred_calls;
410 };
411
412 struct ctdb_deferred_requeue {
413         struct ctdb_deferred_fetch_call *dfc;
414         struct ctdb_client *client;
415 };
416
417 /* called from a timer event and starts reprocessing the deferred call.*/
418 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
419                                        struct timeval t, void *private_data)
420 {
421         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
422         struct ctdb_client *client = dfr->client;
423
424         talloc_steal(client, dfr->dfc->c);
425         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
426         talloc_free(dfr);
427 }
428
429 /* the referral context is destroyed either after a timeout or when the initial
430    fetch-lock has finished.
431    at this stage, immediately start reprocessing the queued up deferred
432    calls so they get reprocessed immediately (and since we are dmaster at
433    this stage, trigger the waiting smbd processes to pick up and aquire the
434    record right away.
435 */
436 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
437 {
438
439         /* need to reprocess the packets from the queue explicitely instead of
440            just using a normal destructor since we want, need, to
441            call the clients in the same oder as the requests queued up
442         */
443         while (dfq->deferred_calls != NULL) {
444                 struct ctdb_client *client;
445                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
446                 struct ctdb_deferred_requeue *dfr;
447
448                 DLIST_REMOVE(dfq->deferred_calls, dfc);
449
450                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
451                 if (client == NULL) {
452                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
453                                  dfc->w->client_id));
454                         continue;
455                 }
456
457                 /* process it by pushing it back onto the eventloop */
458                 dfr = talloc(client, struct ctdb_deferred_requeue);
459                 if (dfr == NULL) {
460                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
461                         continue;
462                 }
463
464                 dfr->dfc    = talloc_steal(dfr, dfc);
465                 dfr->client = client;
466
467                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
468         }
469
470         return 0;
471 }
472
473 /* insert the new deferral context into the rb tree.
474    there should never be a pre-existing context here, but check for it
475    warn and destroy the previous context if there is already a deferral context
476    for this key.
477 */
478 static void *insert_dfq_callback(void *parm, void *data)
479 {
480         if (data) {
481                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
482                 talloc_free(data);
483         }
484         return parm;
485 }
486
487 /* if the original fetch-lock did not complete within a reasonable time,
488    free the context and context for all deferred requests to cause them to be
489    re-inserted into the event system.
490 */
491 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
492                                   struct timeval t, void *private_data)
493 {
494         talloc_free(private_data);
495 }
496
497 /* This function is used in the local daemon to register a KEY in a database
498    for being "fetched"
499    While the remote fetch is in-flight, any futher attempts to re-fetch the
500    same record will be deferred until the fetch completes.
501 */
502 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
503 {
504         uint32_t *k;
505         struct ctdb_deferred_fetch_queue *dfq;
506
507         k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
508         if (k == NULL) {
509                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
510                 return -1;
511         }
512
513         k[0] = (call->key.dsize + 3) / 4 + 1;
514         memcpy(&k[1], call->key.dptr, call->key.dsize);
515
516         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
517         if (dfq == NULL) {
518                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
519                 talloc_free(k);
520                 return -1;
521         }
522         dfq->deferred_calls = NULL;
523
524         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
525
526         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
527
528         /* if the fetch havent completed in 30 seconds, just tear it all down
529            and let it try again as the events are reissued */
530         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
531
532         talloc_free(k);
533         return 0;
534 }
535
536 /* check if this is a duplicate request to a fetch already in-flight
537    if it is, make this call deferred to be reprocessed later when
538    the in-flight fetch completes.
539 */
540 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
541 {
542         uint32_t *k;
543         struct ctdb_deferred_fetch_queue *dfq;
544         struct ctdb_deferred_fetch_call *dfc;
545
546         k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
547         if (k == NULL) {
548                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
549                 return -1;
550         }
551
552         k[0] = (key.dsize + 3) / 4 + 1;
553         memcpy(&k[1], key.dptr, key.dsize);
554
555         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
556         if (dfq == NULL) {
557                 talloc_free(k);
558                 return -1;
559         }
560
561
562         talloc_free(k);
563
564         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
565         if (dfc == NULL) {
566                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
567                 return -1;
568         }
569
570         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
571         if (dfc->w == NULL) {
572                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
573                 talloc_free(dfc);
574                 return -1;
575         }
576
577         dfc->c = talloc_steal(dfc, c);
578         dfc->w->ctdb = ctdb_db->ctdb;
579         dfc->w->client_id = client->client_id;
580
581         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
582
583         return 0;
584 }
585
586
587 /*
588   this is called when the ctdb daemon received a ctdb request call
589   from a local client over the unix domain socket
590  */
591 static void daemon_request_call_from_client(struct ctdb_client *client, 
592                                             struct ctdb_req_call *c)
593 {
594         struct ctdb_call_state *state;
595         struct ctdb_db_context *ctdb_db;
596         struct daemon_call_state *dstate;
597         struct ctdb_call *call;
598         struct ctdb_ltdb_header header;
599         TDB_DATA key, data;
600         int ret;
601         struct ctdb_context *ctdb = client->ctdb;
602         struct ctdb_daemon_packet_wrap *w;
603
604         CTDB_INCREMENT_STAT(ctdb, total_calls);
605         CTDB_DECREMENT_STAT(ctdb, pending_calls);
606
607         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
608         if (!ctdb_db) {
609                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
610                           c->db_id));
611                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
612                 return;
613         }
614
615         if (ctdb_db->unhealthy_reason) {
616                 /*
617                  * this is just a warning, as the tdb should be empty anyway,
618                  * and only persistent databases can be unhealthy, which doesn't
619                  * use this code patch
620                  */
621                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
622                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
623         }
624
625         key.dptr = c->data;
626         key.dsize = c->keylen;
627
628         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
629         CTDB_NO_MEMORY_VOID(ctdb, w);   
630
631         w->ctdb = ctdb;
632         w->client_id = client->client_id;
633
634         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
635                                            (struct ctdb_req_header *)c, &data,
636                                            daemon_incoming_packet_wrap, w, true);
637         if (ret == -2) {
638                 /* will retry later */
639                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
640                 return;
641         }
642
643         talloc_free(w);
644
645         if (ret != 0) {
646                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
647                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
648                 return;
649         }
650
651
652         /* check if this fetch request is a duplicate for a
653            request we already have in flight. If so defer it until
654            the first request completes.
655         */
656         if (ctdb->tunable.fetch_collapse == 1) {
657                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
658                         ret = ctdb_ltdb_unlock(ctdb_db, key);
659                         if (ret != 0) {
660                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
661                         }
662                         return;
663                 }
664         }
665
666         /* Dont do READONLY if we dont have a tracking database */
667         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
668                 c->flags &= ~CTDB_WANT_READONLY;
669         }
670
671         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
672                 header.flags &= ~CTDB_REC_RO_FLAGS;
673                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
674                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
675                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
676                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
677                 }
678                 /* and clear out the tracking data */
679                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
680                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
681                 }
682         }
683
684         /* if we are revoking, we must defer all other calls until the revoke
685          * had completed.
686          */
687         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
688                 talloc_free(data.dptr);
689                 ret = ctdb_ltdb_unlock(ctdb_db, key);
690
691                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
692                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
693                 }
694                 return;
695         }
696
697         if ((header.dmaster == ctdb->pnn)
698         && (!(c->flags & CTDB_WANT_READONLY))
699         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
700                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
701                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
702                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
703                 }
704                 ret = ctdb_ltdb_unlock(ctdb_db, key);
705
706                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
707                         ctdb_fatal(ctdb, "Failed to start record revoke");
708                 }
709                 talloc_free(data.dptr);
710
711                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
712                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
713                 }
714
715                 return;
716         }               
717
718         dstate = talloc(client, struct daemon_call_state);
719         if (dstate == NULL) {
720                 ret = ctdb_ltdb_unlock(ctdb_db, key);
721                 if (ret != 0) {
722                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
723                 }
724
725                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
726                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
727                 return;
728         }
729         dstate->start_time = timeval_current();
730         dstate->client = client;
731         dstate->reqid  = c->hdr.reqid;
732         talloc_steal(dstate, data.dptr);
733
734         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
735         if (call == NULL) {
736                 ret = ctdb_ltdb_unlock(ctdb_db, key);
737                 if (ret != 0) {
738                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
739                 }
740
741                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
742                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
743                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
744                 return;
745         }
746
747         dstate->readonly_fetch = 0;
748         call->call_id = c->callid;
749         call->key = key;
750         call->call_data.dptr = c->data + c->keylen;
751         call->call_data.dsize = c->calldatalen;
752         call->flags = c->flags;
753
754         if (c->flags & CTDB_WANT_READONLY) {
755                 /* client wants readonly record, so translate this into a 
756                    fetch with header. remember what the client asked for
757                    so we can remap the reply back to the proper format for
758                    the client in the reply
759                  */
760                 dstate->client_callid = call->call_id;
761                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
762                 dstate->readonly_fetch = 1;
763         }
764
765         if (header.dmaster == ctdb->pnn) {
766                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
767         } else {
768                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
769                 if (ctdb->tunable.fetch_collapse == 1) {
770                         /* This request triggered a remote fetch-lock.
771                            set up a deferral for this key so any additional
772                            fetch-locks are deferred until the current one
773                            finishes.
774                          */
775                         setup_deferred_fetch_locks(ctdb_db, call);
776                 }
777         }
778
779         ret = ctdb_ltdb_unlock(ctdb_db, key);
780         if (ret != 0) {
781                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
782         }
783
784         if (state == NULL) {
785                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
786                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
787                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
788                 return;
789         }
790         talloc_steal(state, dstate);
791         talloc_steal(client, state);
792
793         state->async.fn = daemon_call_from_client_callback;
794         state->async.private_data = dstate;
795 }
796
797
798 static void daemon_request_control_from_client(struct ctdb_client *client, 
799                                                struct ctdb_req_control *c);
800
801 /* data contains a packet from the client */
802 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
803 {
804         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
805         TALLOC_CTX *tmp_ctx;
806         struct ctdb_context *ctdb = client->ctdb;
807
808         /* place the packet as a child of a tmp_ctx. We then use
809            talloc_free() below to free it. If any of the calls want
810            to keep it, then they will steal it somewhere else, and the
811            talloc_free() will be a no-op */
812         tmp_ctx = talloc_new(client);
813         talloc_steal(tmp_ctx, hdr);
814
815         if (hdr->ctdb_magic != CTDB_MAGIC) {
816                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
817                 goto done;
818         }
819
820         if (hdr->ctdb_version != CTDB_VERSION) {
821                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
822                 goto done;
823         }
824
825         switch (hdr->operation) {
826         case CTDB_REQ_CALL:
827                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
828                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
829                 break;
830
831         case CTDB_REQ_MESSAGE:
832                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
833                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
834                 break;
835
836         case CTDB_REQ_CONTROL:
837                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
838                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
839                 break;
840
841         default:
842                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
843                          hdr->operation));
844         }
845
846 done:
847         talloc_free(tmp_ctx);
848 }
849
850 /*
851   called when the daemon gets a incoming packet
852  */
853 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
854 {
855         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
856         struct ctdb_req_header *hdr;
857
858         if (cnt == 0) {
859                 talloc_free(client);
860                 return;
861         }
862
863         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
864
865         if (cnt < sizeof(*hdr)) {
866                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
867                                (unsigned)cnt);
868                 return;
869         }
870         hdr = (struct ctdb_req_header *)data;
871         if (cnt != hdr->length) {
872                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
873                                (unsigned)hdr->length, (unsigned)cnt);
874                 return;
875         }
876
877         if (hdr->ctdb_magic != CTDB_MAGIC) {
878                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
879                 return;
880         }
881
882         if (hdr->ctdb_version != CTDB_VERSION) {
883                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
884                 return;
885         }
886
887         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
888                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
889                  hdr->srcnode, hdr->destnode));
890
891         /* it is the responsibility of the incoming packet function to free 'data' */
892         daemon_incoming_packet(client, hdr);
893 }
894
895
896 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
897 {
898         if (client_pid->ctdb->client_pids != NULL) {
899                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
900         }
901
902         return 0;
903 }
904
905
906 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
907                          uint16_t flags, void *private_data)
908 {
909         struct sockaddr_un addr;
910         socklen_t len;
911         int fd;
912         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
913         struct ctdb_client *client;
914         struct ctdb_client_pid_list *client_pid;
915         pid_t peer_pid = 0;
916
917         memset(&addr, 0, sizeof(addr));
918         len = sizeof(addr);
919         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
920         if (fd == -1) {
921                 return;
922         }
923
924         set_nonblocking(fd);
925         set_close_on_exec(fd);
926
927         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
928
929         client = talloc_zero(ctdb, struct ctdb_client);
930         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
931                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
932         }
933
934         client->ctdb = ctdb;
935         client->fd = fd;
936         client->client_id = ctdb_reqid_new(ctdb, client);
937         client->pid = peer_pid;
938
939         client_pid = talloc(client, struct ctdb_client_pid_list);
940         if (client_pid == NULL) {
941                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
942                 close(fd);
943                 talloc_free(client);
944                 return;
945         }               
946         client_pid->ctdb   = ctdb;
947         client_pid->pid    = peer_pid;
948         client_pid->client = client;
949
950         DLIST_ADD(ctdb->client_pids, client_pid);
951
952         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
953                                          ctdb_daemon_read_cb, client,
954                                          "client-%u", client->pid);
955
956         talloc_set_destructor(client, ctdb_client_destructor);
957         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
958         ctdb->num_clients++;
959 }
960
961
962
963 /*
964   create a unix domain socket and bind it
965   return a file descriptor open on the socket 
966 */
967 static int ux_socket_bind(struct ctdb_context *ctdb)
968 {
969         struct sockaddr_un addr;
970
971         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
972         if (ctdb->daemon.sd == -1) {
973                 return -1;
974         }
975
976         memset(&addr, 0, sizeof(addr));
977         addr.sun_family = AF_UNIX;
978         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
979
980         /* First check if an old ctdbd might be running */
981         if (connect(ctdb->daemon.sd,
982                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
983                 DEBUG(DEBUG_CRIT,
984                       ("Something is already listening on ctdb socket '%s'\n",
985                        ctdb->daemon.name));
986                 goto failed;
987         }
988
989         /* Remove any old socket */
990         unlink(ctdb->daemon.name);
991
992         set_close_on_exec(ctdb->daemon.sd);
993         set_nonblocking(ctdb->daemon.sd);
994
995         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
996                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
997                 goto failed;
998         }
999
1000         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1001             chmod(ctdb->daemon.name, 0700) != 0) {
1002                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1003                 goto failed;
1004         }
1005
1006
1007         if (listen(ctdb->daemon.sd, 100) != 0) {
1008                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1009                 goto failed;
1010         }
1011
1012         return 0;
1013
1014 failed:
1015         close(ctdb->daemon.sd);
1016         ctdb->daemon.sd = -1;
1017         return -1;      
1018 }
1019
1020 static void initialise_node_flags (struct ctdb_context *ctdb)
1021 {
1022         if (ctdb->pnn == -1) {
1023                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1024         }
1025
1026         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1027
1028         /* do we start out in DISABLED mode? */
1029         if (ctdb->start_as_disabled != 0) {
1030                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1031                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1032         }
1033         /* do we start out in STOPPED mode? */
1034         if (ctdb->start_as_stopped != 0) {
1035                 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1036                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1037         }
1038 }
1039
1040 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1041                                       void *private_data)
1042 {
1043         if (status != 0) {
1044                 ctdb_die(ctdb, "Failed to run setup event");
1045         }
1046         ctdb_run_notification_script(ctdb, "setup");
1047
1048         /* tell all other nodes we've just started up */
1049         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1050                                  0, CTDB_CONTROL_STARTUP, 0,
1051                                  CTDB_CTRL_FLAG_NOREPLY,
1052                                  tdb_null, NULL, NULL);
1053
1054         /* Start the recovery daemon */
1055         if (ctdb_start_recoverd(ctdb) != 0) {
1056                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1057                 exit(11);
1058         }
1059
1060         ctdb_start_periodic_events(ctdb);
1061
1062         ctdb_wait_for_first_recovery(ctdb);
1063 }
1064
1065 static struct timeval tevent_before_wait_ts;
1066 static struct timeval tevent_after_wait_ts;
1067
1068 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1069                               void *private_data)
1070 {
1071         struct timeval diff;
1072         struct timeval now;
1073         struct ctdb_context *ctdb =
1074                 talloc_get_type(private_data, struct ctdb_context);
1075
1076         if (getpid() != ctdb->ctdbd_pid) {
1077                 return;
1078         }
1079
1080         now = timeval_current();
1081
1082         switch (tp) {
1083         case TEVENT_TRACE_BEFORE_WAIT:
1084                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1085                         diff = timeval_until(&tevent_after_wait_ts, &now);
1086                         if (diff.tv_sec > 3) {
1087                                 DEBUG(DEBUG_ERR,
1088                                       ("Handling event took %ld seconds!\n",
1089                                        diff.tv_sec));
1090                         }
1091                 }
1092                 tevent_before_wait_ts = now;
1093                 break;
1094
1095         case TEVENT_TRACE_AFTER_WAIT:
1096                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1097                         diff = timeval_until(&tevent_before_wait_ts, &now);
1098                         if (diff.tv_sec > 3) {
1099                                 DEBUG(DEBUG_CRIT,
1100                                       ("No event for %ld seconds!\n",
1101                                        diff.tv_sec));
1102                         }
1103                 }
1104                 tevent_after_wait_ts = now;
1105                 break;
1106
1107         default:
1108                 /* Do nothing for future tevent trace points */ ;
1109         }
1110 }
1111
1112 static void ctdb_remove_pidfile(void)
1113 {
1114         /* Only the main ctdbd's PID matches the SID */
1115         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1116                 if (unlink(ctdbd_pidfile) == 0) {
1117                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1118                                              ctdbd_pidfile));
1119                 } else {
1120                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1121                                               ctdbd_pidfile));
1122                 }
1123         }
1124 }
1125
1126 static void ctdb_create_pidfile(pid_t pid)
1127 {
1128         if (ctdbd_pidfile != NULL) {
1129                 FILE *fp;
1130
1131                 fp = fopen(ctdbd_pidfile, "w");
1132                 if (fp == NULL) {
1133                         DEBUG(DEBUG_ALERT,
1134                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1135                         exit(11);
1136                 }
1137
1138                 fprintf(fp, "%d\n", pid);
1139                 fclose(fp);
1140                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1141                 atexit(ctdb_remove_pidfile);
1142         }
1143 }
1144
1145 /*
1146   start the protocol going as a daemon
1147 */
1148 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
1149 {
1150         int res, ret = -1;
1151         struct fd_event *fde;
1152         const char *domain_socket_name;
1153
1154         /* create a unix domain stream socket to listen to */
1155         res = ux_socket_bind(ctdb);
1156         if (res!=0) {
1157                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1158                 exit(10);
1159         }
1160
1161         if (do_fork && fork()) {
1162                 return 0;
1163         }
1164
1165         tdb_reopen_all(false);
1166
1167         if (do_fork) {
1168                 if (setsid() == -1) {
1169                         ctdb_die(ctdb, "Failed to setsid()\n");
1170                 }
1171                 close(0);
1172                 if (open("/dev/null", O_RDONLY) != 0) {
1173                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1174                         exit(11);
1175                 }
1176         }
1177         ignore_signal(SIGPIPE);
1178
1179         ctdb->ctdbd_pid = getpid();
1180         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1181                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1182         ctdb_create_pidfile(ctdb->ctdbd_pid);
1183
1184         /* Make sure we log something when the daemon terminates.
1185          * This must be the first exit handler to run (so the last to
1186          * be registered.
1187          */
1188         atexit(print_exit_message);
1189
1190         if (ctdb->do_setsched) {
1191                 /* try to set us up as realtime */
1192                 set_scheduler();
1193         }
1194
1195         /* ensure the socket is deleted on exit of the daemon */
1196         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1197         if (domain_socket_name == NULL) {
1198                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1199                 exit(12);
1200         }
1201
1202         ctdb->ev = event_context_init(NULL);
1203         tevent_loop_allow_nesting(ctdb->ev);
1204         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1205         ret = ctdb_init_tevent_logging(ctdb);
1206         if (ret != 0) {
1207                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1208                 exit(1);
1209         }
1210
1211         /* set up a handler to pick up sigchld */
1212         if (ctdb_init_sigchld(ctdb) == NULL) {
1213                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1214                 exit(1);
1215         }
1216
1217         ctdb_set_child_logging(ctdb);
1218         if (use_syslog) {
1219                 if (start_syslog_daemon(ctdb)) {
1220                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1221                         exit(10);
1222                 }
1223         }
1224
1225         /* initialize statistics collection */
1226         ctdb_statistics_init(ctdb);
1227
1228         /* force initial recovery for election */
1229         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1230
1231         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1232         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1233         if (ret != 0) {
1234                 ctdb_die(ctdb, "Failed to run init event\n");
1235         }
1236         ctdb_run_notification_script(ctdb, "init");
1237
1238         if (strcmp(ctdb->transport, "tcp") == 0) {
1239                 int ctdb_tcp_init(struct ctdb_context *);
1240                 ret = ctdb_tcp_init(ctdb);
1241         }
1242 #ifdef USE_INFINIBAND
1243         if (strcmp(ctdb->transport, "ib") == 0) {
1244                 int ctdb_ibw_init(struct ctdb_context *);
1245                 ret = ctdb_ibw_init(ctdb);
1246         }
1247 #endif
1248         if (ret != 0) {
1249                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1250                 return -1;
1251         }
1252
1253         if (ctdb->methods == NULL) {
1254                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1255                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1256         }
1257
1258         /* initialise the transport  */
1259         if (ctdb->methods->initialise(ctdb) != 0) {
1260                 ctdb_fatal(ctdb, "transport failed to initialise");
1261         }
1262
1263         initialise_node_flags(ctdb);
1264
1265         if (ctdb->public_addresses_file) {
1266                 ret = ctdb_set_public_addresses(ctdb, true);
1267                 if (ret == -1) {
1268                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1269                         exit(1);
1270                 }
1271                 if (ctdb->do_checkpublicip) {
1272                         ctdb_start_monitoring_interfaces(ctdb);
1273                 }
1274         }
1275
1276
1277         /* attach to existing databases */
1278         if (ctdb_attach_databases(ctdb) != 0) {
1279                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1280         }
1281
1282         /* start frozen, then let the first election sort things out */
1283         if (!ctdb_blocking_freeze(ctdb)) {
1284                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1285         }
1286
1287         /* now start accepting clients, only can do this once frozen */
1288         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1289                            EVENT_FD_READ,
1290                            ctdb_accept_client, ctdb);
1291         if (fde == NULL) {
1292                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1293         }
1294         tevent_fd_set_auto_close(fde);
1295
1296         /* release any IPs we hold from previous runs of the daemon */
1297         if (ctdb->tunable.disable_ip_failover == 0) {
1298                 ctdb_release_all_ips(ctdb);
1299         }
1300
1301         /* Start the transport */
1302         if (ctdb->methods->start(ctdb) != 0) {
1303                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1304                 ctdb_fatal(ctdb, "transport failed to start");
1305         }
1306
1307         /* Recovery daemon and timed events are started from the
1308          * callback, only after the setup event completes
1309          * successfully.
1310          */
1311         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1312         ret = ctdb_event_script_callback(ctdb,
1313                                          ctdb,
1314                                          ctdb_setup_event_callback,
1315                                          ctdb,
1316                                          CTDB_EVENT_SETUP,
1317                                          "%s",
1318                                          "");
1319         if (ret != 0) {
1320                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1321                 exit(1);
1322         }
1323
1324         lockdown_memory(ctdb->valgrinding);
1325
1326         /* go into a wait loop to allow other nodes to complete */
1327         event_loop_wait(ctdb->ev);
1328
1329         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1330         exit(1);
1331 }
1332
1333 /*
1334   allocate a packet for use in daemon<->daemon communication
1335  */
1336 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1337                                                  TALLOC_CTX *mem_ctx, 
1338                                                  enum ctdb_operation operation, 
1339                                                  size_t length, size_t slength,
1340                                                  const char *type)
1341 {
1342         int size;
1343         struct ctdb_req_header *hdr;
1344
1345         length = MAX(length, slength);
1346         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1347
1348         if (ctdb->methods == NULL) {
1349                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1350                          operation, (unsigned)length));
1351                 return NULL;
1352         }
1353
1354         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1355         if (hdr == NULL) {
1356                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1357                          operation, (unsigned)length));
1358                 return NULL;
1359         }
1360         talloc_set_name_const(hdr, type);
1361         memset(hdr, 0, slength);
1362         hdr->length       = length;
1363         hdr->operation    = operation;
1364         hdr->ctdb_magic   = CTDB_MAGIC;
1365         hdr->ctdb_version = CTDB_VERSION;
1366         hdr->generation   = ctdb->vnn_map->generation;
1367         hdr->srcnode      = ctdb->pnn;
1368
1369         return hdr;     
1370 }
1371
1372 struct daemon_control_state {
1373         struct daemon_control_state *next, *prev;
1374         struct ctdb_client *client;
1375         struct ctdb_req_control *c;
1376         uint32_t reqid;
1377         struct ctdb_node *node;
1378 };
1379
1380 /*
1381   callback when a control reply comes in
1382  */
1383 static void daemon_control_callback(struct ctdb_context *ctdb,
1384                                     int32_t status, TDB_DATA data, 
1385                                     const char *errormsg,
1386                                     void *private_data)
1387 {
1388         struct daemon_control_state *state = talloc_get_type(private_data, 
1389                                                              struct daemon_control_state);
1390         struct ctdb_client *client = state->client;
1391         struct ctdb_reply_control *r;
1392         size_t len;
1393         int ret;
1394
1395         /* construct a message to send to the client containing the data */
1396         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1397         if (errormsg) {
1398                 len += strlen(errormsg);
1399         }
1400         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1401                                struct ctdb_reply_control);
1402         CTDB_NO_MEMORY_VOID(ctdb, r);
1403
1404         r->hdr.reqid     = state->reqid;
1405         r->status        = status;
1406         r->datalen       = data.dsize;
1407         r->errorlen = 0;
1408         memcpy(&r->data[0], data.dptr, data.dsize);
1409         if (errormsg) {
1410                 r->errorlen = strlen(errormsg);
1411                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1412         }
1413
1414         ret = daemon_queue_send(client, &r->hdr);
1415         if (ret != -1) {
1416                 talloc_free(state);
1417         }
1418 }
1419
1420 /*
1421   fail all pending controls to a disconnected node
1422  */
1423 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1424 {
1425         struct daemon_control_state *state;
1426         while ((state = node->pending_controls)) {
1427                 DLIST_REMOVE(node->pending_controls, state);
1428                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1429                                         "node is disconnected", state);
1430         }
1431 }
1432
1433 /*
1434   destroy a daemon_control_state
1435  */
1436 static int daemon_control_destructor(struct daemon_control_state *state)
1437 {
1438         if (state->node) {
1439                 DLIST_REMOVE(state->node->pending_controls, state);
1440         }
1441         return 0;
1442 }
1443
1444 /*
1445   this is called when the ctdb daemon received a ctdb request control
1446   from a local client over the unix domain socket
1447  */
1448 static void daemon_request_control_from_client(struct ctdb_client *client, 
1449                                                struct ctdb_req_control *c)
1450 {
1451         TDB_DATA data;
1452         int res;
1453         struct daemon_control_state *state;
1454         TALLOC_CTX *tmp_ctx = talloc_new(client);
1455
1456         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1457                 c->hdr.destnode = client->ctdb->pnn;
1458         }
1459
1460         state = talloc(client, struct daemon_control_state);
1461         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1462
1463         state->client = client;
1464         state->c = talloc_steal(state, c);
1465         state->reqid = c->hdr.reqid;
1466         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1467                 state->node = client->ctdb->nodes[c->hdr.destnode];
1468                 DLIST_ADD(state->node->pending_controls, state);
1469         } else {
1470                 state->node = NULL;
1471         }
1472
1473         talloc_set_destructor(state, daemon_control_destructor);
1474
1475         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1476                 talloc_steal(tmp_ctx, state);
1477         }
1478         
1479         data.dptr = &c->data[0];
1480         data.dsize = c->datalen;
1481         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1482                                        c->srvid, c->opcode, client->client_id,
1483                                        c->flags,
1484                                        data, daemon_control_callback,
1485                                        state);
1486         if (res != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1488                          c->hdr.destnode));
1489         }
1490
1491         talloc_free(tmp_ctx);
1492 }
1493
1494 /*
1495   register a call function
1496 */
1497 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1498                          ctdb_fn_t fn, int id)
1499 {
1500         struct ctdb_registered_call *call;
1501         struct ctdb_db_context *ctdb_db;
1502
1503         ctdb_db = find_ctdb_db(ctdb, db_id);
1504         if (ctdb_db == NULL) {
1505                 return -1;
1506         }
1507
1508         call = talloc(ctdb_db, struct ctdb_registered_call);
1509         call->fn = fn;
1510         call->id = id;
1511
1512         DLIST_ADD(ctdb_db->calls, call);        
1513         return 0;
1514 }
1515
1516
1517
1518 /*
1519   this local messaging handler is ugly, but is needed to prevent
1520   recursion in ctdb_send_message() when the destination node is the
1521   same as the source node
1522  */
1523 struct ctdb_local_message {
1524         struct ctdb_context *ctdb;
1525         uint64_t srvid;
1526         TDB_DATA data;
1527 };
1528
1529 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1530                                        struct timeval t, void *private_data)
1531 {
1532         struct ctdb_local_message *m = talloc_get_type(private_data, 
1533                                                        struct ctdb_local_message);
1534         int res;
1535
1536         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1537         if (res != 0) {
1538                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1539                           (unsigned long long)m->srvid));
1540         }
1541         talloc_free(m);
1542 }
1543
1544 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1545 {
1546         struct ctdb_local_message *m;
1547         m = talloc(ctdb, struct ctdb_local_message);
1548         CTDB_NO_MEMORY(ctdb, m);
1549
1550         m->ctdb = ctdb;
1551         m->srvid = srvid;
1552         m->data  = data;
1553         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1554         if (m->data.dptr == NULL) {
1555                 talloc_free(m);
1556                 return -1;
1557         }
1558
1559         /* this needs to be done as an event to prevent recursion */
1560         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1561         return 0;
1562 }
1563
1564 /*
1565   send a ctdb message
1566 */
1567 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1568                              uint64_t srvid, TDB_DATA data)
1569 {
1570         struct ctdb_req_message *r;
1571         int len;
1572
1573         if (ctdb->methods == NULL) {
1574                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1575                 return -1;
1576         }
1577
1578         /* see if this is a message to ourselves */
1579         if (pnn == ctdb->pnn) {
1580                 return ctdb_local_message(ctdb, srvid, data);
1581         }
1582
1583         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1584         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1585                                     struct ctdb_req_message);
1586         CTDB_NO_MEMORY(ctdb, r);
1587
1588         r->hdr.destnode  = pnn;
1589         r->srvid         = srvid;
1590         r->datalen       = data.dsize;
1591         memcpy(&r->data[0], data.dptr, data.dsize);
1592
1593         ctdb_queue_packet(ctdb, &r->hdr);
1594
1595         talloc_free(r);
1596         return 0;
1597 }
1598
1599
1600
1601 struct ctdb_client_notify_list {
1602         struct ctdb_client_notify_list *next, *prev;
1603         struct ctdb_context *ctdb;
1604         uint64_t srvid;
1605         TDB_DATA data;
1606 };
1607
1608
1609 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1610 {
1611         int ret;
1612
1613         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1614
1615         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1616         if (ret != 0) {
1617                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1618         }
1619
1620         return 0;
1621 }
1622
1623 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1624 {
1625         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1626         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1627         struct ctdb_client_notify_list *nl;
1628
1629         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1630
1631         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1632                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1633                 return -1;
1634         }
1635
1636         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1637                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1638                 return -1;
1639         }
1640
1641
1642         if (client == NULL) {
1643                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1644                 return -1;
1645         }
1646
1647         for(nl=client->notify; nl; nl=nl->next) {
1648                 if (nl->srvid == notify->srvid) {
1649                         break;
1650                 }
1651         }
1652         if (nl != NULL) {
1653                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1654                 return -1;
1655         }
1656
1657         nl = talloc(client, struct ctdb_client_notify_list);
1658         CTDB_NO_MEMORY(ctdb, nl);
1659         nl->ctdb       = ctdb;
1660         nl->srvid      = notify->srvid;
1661         nl->data.dsize = notify->len;
1662         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1663         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1664         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1665         
1666         DLIST_ADD(client->notify, nl);
1667         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1668
1669         return 0;
1670 }
1671
1672 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1673 {
1674         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1675         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1676         struct ctdb_client_notify_list *nl;
1677
1678         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1679
1680         if (client == NULL) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1682                 return -1;
1683         }
1684
1685         for(nl=client->notify; nl; nl=nl->next) {
1686                 if (nl->srvid == notify->srvid) {
1687                         break;
1688                 }
1689         }
1690         if (nl == NULL) {
1691                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1692                 return -1;
1693         }
1694
1695         DLIST_REMOVE(client->notify, nl);
1696         talloc_set_destructor(nl, NULL);
1697         talloc_free(nl);
1698
1699         return 0;
1700 }
1701
1702 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1703 {
1704         struct ctdb_client_pid_list *client_pid;
1705
1706         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1707                 if (client_pid->pid == pid) {
1708                         return client_pid->client;
1709                 }
1710         }
1711         return NULL;
1712 }
1713
1714
1715 /* This control is used by samba when probing if a process (of a samba daemon)
1716    exists on the node.
1717    Samba does this when it needs/wants to check if a subrecord in one of the
1718    databases is still valied, or if it is stale and can be removed.
1719    If the node is in unhealthy or stopped state we just kill of the samba
1720    process holding htis sub-record and return to the calling samba that
1721    the process does not exist.
1722    This allows us to forcefully recall subrecords registered by samba processes
1723    on banned and stopped nodes.
1724 */
1725 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1726 {
1727         struct ctdb_client *client;
1728
1729         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1730                 client = ctdb_find_client_by_pid(ctdb, pid);
1731                 if (client != NULL) {
1732                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1733                         talloc_free(client);
1734                 }
1735                 return -1;
1736         }
1737
1738         return kill(pid, 0);
1739 }
1740
1741 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1742 {
1743         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1744                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1745                 return;
1746         }
1747
1748         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1749         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1750         ctdb_stop_recoverd(ctdb);
1751         ctdb_stop_keepalive(ctdb);
1752         ctdb_stop_monitoring(ctdb);
1753         ctdb_release_all_ips(ctdb);
1754         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1755         if (ctdb->methods != NULL) {
1756                 ctdb->methods->shutdown(ctdb);
1757         }
1758
1759         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1760         exit(exit_code);
1761 }