recoverd: Rebalancing should be done regardless tunable
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "tdb.h"
23 #include "lib/util/dlinklist.h"
24 #include "system/network.h"
25 #include "system/filesys.h"
26 #include "system/wait.h"
27 #include "../include/ctdb_version.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include "../common/rb_tree.h"
31 #include <sys/socket.h>
32
33 struct ctdb_client_pid_list {
34         struct ctdb_client_pid_list *next, *prev;
35         struct ctdb_context *ctdb;
36         pid_t pid;
37         struct ctdb_client *client;
38 };
39
40 const char *ctdbd_pidfile = NULL;
41
42 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
43
44 static void print_exit_message(void)
45 {
46         if (debug_extra != NULL && debug_extra[0] != '\0') {
47                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
48         } else {
49                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
50
51                 /* Wait a second to allow pending log messages to be flushed */
52                 sleep(1);
53         }
54 }
55
56
57
58 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
59                                   struct timeval t, void *private_data)
60 {
61         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
62
63         if (getpid() != ctdbd_pid) {
64                 return;
65         }
66
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72 /* Used to trigger a dummy event once per second, to make
73  * detection of hangs more reliable.
74  */
75 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
76 {
77         event_add_timed(ctdb->ev, ctdb, 
78                         timeval_current_ofs(1, 0), 
79                         ctdb_time_tick, ctdb);
80 }
81
82 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
83 {
84         /* start monitoring for connected/disconnected nodes */
85         ctdb_start_keepalive(ctdb);
86
87         /* start monitoring for node health */
88         ctdb_start_monitoring(ctdb);
89
90         /* start periodic update of tcp tickle lists */
91         ctdb_start_tcp_tickle_update(ctdb);
92
93         /* start listening for recovery daemon pings */
94         ctdb_control_recd_ping(ctdb);
95
96         /* start listening to timer ticks */
97         ctdb_start_time_tickd(ctdb);
98 }
99
100 static void block_signal(int signum)
101 {
102         struct sigaction act;
103
104         memset(&act, 0, sizeof(act));
105
106         act.sa_handler = SIG_IGN;
107         sigemptyset(&act.sa_mask);
108         sigaddset(&act.sa_mask, signum);
109         sigaction(signum, &act, NULL);
110 }
111
112
113 /*
114   send a packet to a client
115  */
116 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
117 {
118         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
119         if (hdr->operation == CTDB_REQ_MESSAGE) {
120                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
121                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
122                         talloc_free(client);
123                         return -1;
124                 }
125         }
126         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
127 }
128
129 /*
130   message handler for when we are in daemon mode. This redirects the message
131   to the right client
132  */
133 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
134                                     TDB_DATA data, void *private_data)
135 {
136         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
137         struct ctdb_req_message *r;
138         int len;
139
140         /* construct a message to send to the client containing the data */
141         len = offsetof(struct ctdb_req_message, data) + data.dsize;
142         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
143                                len, struct ctdb_req_message);
144         CTDB_NO_MEMORY_VOID(ctdb, r);
145
146         talloc_set_name_const(r, "req_message packet");
147
148         r->srvid         = srvid;
149         r->datalen       = data.dsize;
150         memcpy(&r->data[0], data.dptr, data.dsize);
151
152         daemon_queue_send(client, &r->hdr);
153
154         talloc_free(r);
155 }
156
157 /*
158   this is called when the ctdb daemon received a ctdb request to 
159   set the srvid from the client
160  */
161 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
162 {
163         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
164         int res;
165         if (client == NULL) {
166                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
167                 return -1;
168         }
169         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
170         if (res != 0) {
171                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
172                          (unsigned long long)srvid));
173         } else {
174                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
175                          (unsigned long long)srvid));
176         }
177
178         return res;
179 }
180
181 /*
182   this is called when the ctdb daemon received a ctdb request to 
183   remove a srvid from the client
184  */
185 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
186 {
187         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
188         if (client == NULL) {
189                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
190                 return -1;
191         }
192         return ctdb_deregister_message_handler(ctdb, srvid, client);
193 }
194
195 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
196                         TDB_DATA *outdata)
197 {
198         uint64_t *ids;
199         int i, num_ids;
200         uint8_t *results;
201
202         if ((indata.dsize % sizeof(uint64_t)) != 0) {
203                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
204                                   "size=%d\n", (int)indata.dsize));
205                 return -1;
206         }
207
208         ids = (uint64_t *)indata.dptr;
209         num_ids = indata.dsize / 8;
210
211         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
212         if (results == NULL) {
213                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
214                 return -1;
215         }
216         for (i=0; i<num_ids; i++) {
217                 if (ctdb_check_message_handler(ctdb, ids[i])) {
218                         results[i/8] |= (1 << (i%8));
219                 }
220         }
221         outdata->dptr = (uint8_t *)results;
222         outdata->dsize = talloc_get_size(results);
223         return 0;
224 }
225
226 /*
227   destroy a ctdb_client
228 */
229 static int ctdb_client_destructor(struct ctdb_client *client)
230 {
231         struct ctdb_db_context *ctdb_db;
232
233         ctdb_takeover_client_destructor_hook(client);
234         ctdb_reqid_remove(client->ctdb, client->client_id);
235         client->ctdb->num_clients--;
236
237         if (client->num_persistent_updates != 0) {
238                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
239                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
240         }
241         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
242         if (ctdb_db) {
243                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
244                                   "commit active. Forcing recovery.\n"));
245                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
246
247                 /*
248                  * trans3 transaction state:
249                  *
250                  * The destructor sets the pointer to NULL.
251                  */
252                 talloc_free(ctdb_db->persistent_state);
253         }
254
255         return 0;
256 }
257
258
259 /*
260   this is called when the ctdb daemon received a ctdb request message
261   from a local client over the unix domain socket
262  */
263 static void daemon_request_message_from_client(struct ctdb_client *client, 
264                                                struct ctdb_req_message *c)
265 {
266         TDB_DATA data;
267         int res;
268
269         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
270                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
271         }
272
273         /* maybe the message is for another client on this node */
274         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
275                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
276                 return;
277         }
278
279         /* its for a remote node */
280         data.dptr = &c->data[0];
281         data.dsize = c->datalen;
282         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
283                                        c->srvid, data);
284         if (res != 0) {
285                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
286                          c->hdr.destnode));
287         }
288 }
289
290
291 struct daemon_call_state {
292         struct ctdb_client *client;
293         uint32_t reqid;
294         struct ctdb_call *call;
295         struct timeval start_time;
296
297         /* readonly request ? */
298         uint32_t readonly_fetch;
299         uint32_t client_callid;
300 };
301
302 /* 
303    complete a call from a client 
304 */
305 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
306 {
307         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
308                                                            struct daemon_call_state);
309         struct ctdb_reply_call *r;
310         int res;
311         uint32_t length;
312         struct ctdb_client *client = dstate->client;
313         struct ctdb_db_context *ctdb_db = state->ctdb_db;
314
315         talloc_steal(client, dstate);
316         talloc_steal(dstate, dstate->call);
317
318         res = ctdb_daemon_call_recv(state, dstate->call);
319         if (res != 0) {
320                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
321                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
322
323                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
324                 return;
325         }
326
327         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
328         /* If the client asked for readonly FETCH, we remapped this to 
329            FETCH_WITH_HEADER when calling the daemon. So we must
330            strip the extra header off the reply data before passing
331            it back to the client.
332         */
333         if (dstate->readonly_fetch
334         && dstate->client_callid == CTDB_FETCH_FUNC) {
335                 length -= sizeof(struct ctdb_ltdb_header);
336         }
337
338         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
339                                length, struct ctdb_reply_call);
340         if (r == NULL) {
341                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
342                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
343                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
344                 return;
345         }
346         r->hdr.reqid        = dstate->reqid;
347         r->status           = dstate->call->status;
348
349         if (dstate->readonly_fetch
350         && dstate->client_callid == CTDB_FETCH_FUNC) {
351                 /* client only asked for a FETCH so we must strip off
352                    the extra ctdb_ltdb header
353                 */
354                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
355                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
356         } else {
357                 r->datalen          = dstate->call->reply_data.dsize;
358                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
359         }
360
361         res = daemon_queue_send(client, &r->hdr);
362         if (res == -1) {
363                 /* client is dead - return immediately */
364                 return;
365         }
366         if (res != 0) {
367                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
368         }
369         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
370         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
371         talloc_free(dstate);
372 }
373
374 struct ctdb_daemon_packet_wrap {
375         struct ctdb_context *ctdb;
376         uint32_t client_id;
377 };
378
379 /*
380   a wrapper to catch disconnected clients
381  */
382 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
383 {
384         struct ctdb_client *client;
385         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
386                                                             struct ctdb_daemon_packet_wrap);
387         if (w == NULL) {
388                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
389                 return;
390         }
391
392         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
393         if (client == NULL) {
394                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
395                          w->client_id));
396                 talloc_free(w);
397                 return;
398         }
399         talloc_free(w);
400
401         /* process it */
402         daemon_incoming_packet(client, hdr);    
403 }
404
405 struct ctdb_deferred_fetch_call {
406         struct ctdb_deferred_fetch_call *next, *prev;
407         struct ctdb_req_call *c;
408         struct ctdb_daemon_packet_wrap *w;
409 };
410
411 struct ctdb_deferred_fetch_queue {
412         struct ctdb_deferred_fetch_call *deferred_calls;
413 };
414
415 struct ctdb_deferred_requeue {
416         struct ctdb_deferred_fetch_call *dfc;
417         struct ctdb_client *client;
418 };
419
420 /* called from a timer event and starts reprocessing the deferred call.*/
421 static void reprocess_deferred_call(struct event_context *ev, struct timed_event *te, 
422                                        struct timeval t, void *private_data)
423 {
424         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
425         struct ctdb_client *client = dfr->client;
426
427         talloc_steal(client, dfr->dfc->c);
428         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
429         talloc_free(dfr);
430 }
431
432 /* the referral context is destroyed either after a timeout or when the initial
433    fetch-lock has finished.
434    at this stage, immediately start reprocessing the queued up deferred
435    calls so they get reprocessed immediately (and since we are dmaster at
436    this stage, trigger the waiting smbd processes to pick up and aquire the
437    record right away.
438 */
439 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
440 {
441
442         /* need to reprocess the packets from the queue explicitely instead of
443            just using a normal destructor since we want, need, to
444            call the clients in the same oder as the requests queued up
445         */
446         while (dfq->deferred_calls != NULL) {
447                 struct ctdb_client *client;
448                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
449                 struct ctdb_deferred_requeue *dfr;
450
451                 DLIST_REMOVE(dfq->deferred_calls, dfc);
452
453                 client = ctdb_reqid_find(dfc->w->ctdb, dfc->w->client_id, struct ctdb_client);
454                 if (client == NULL) {
455                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
456                                  dfc->w->client_id));
457                         continue;
458                 }
459
460                 /* process it by pushing it back onto the eventloop */
461                 dfr = talloc(client, struct ctdb_deferred_requeue);
462                 if (dfr == NULL) {
463                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
464                         continue;
465                 }
466
467                 dfr->dfc    = talloc_steal(dfr, dfc);
468                 dfr->client = client;
469
470                 event_add_timed(dfc->w->ctdb->ev, client, timeval_zero(), reprocess_deferred_call, dfr);
471         }
472
473         return 0;
474 }
475
476 /* insert the new deferral context into the rb tree.
477    there should never be a pre-existing context here, but check for it
478    warn and destroy the previous context if there is already a deferral context
479    for this key.
480 */
481 static void *insert_dfq_callback(void *parm, void *data)
482 {
483         if (data) {
484                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
485                 talloc_free(data);
486         }
487         return parm;
488 }
489
490 /* if the original fetch-lock did not complete within a reasonable time,
491    free the context and context for all deferred requests to cause them to be
492    re-inserted into the event system.
493 */
494 static void dfq_timeout(struct event_context *ev, struct timed_event *te, 
495                                   struct timeval t, void *private_data)
496 {
497         talloc_free(private_data);
498 }
499
500 /* This function is used in the local daemon to register a KEY in a database
501    for being "fetched"
502    While the remote fetch is in-flight, any futher attempts to re-fetch the
503    same record will be deferred until the fetch completes.
504 */
505 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
506 {
507         uint32_t *k;
508         struct ctdb_deferred_fetch_queue *dfq;
509
510         k = talloc_zero_size(call, ((call->key.dsize + 3) & 0xfffffffc) + 4);
511         if (k == NULL) {
512                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
513                 return -1;
514         }
515
516         k[0] = (call->key.dsize + 3) / 4 + 1;
517         memcpy(&k[1], call->key.dptr, call->key.dsize);
518
519         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
520         if (dfq == NULL) {
521                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
522                 talloc_free(k);
523                 return -1;
524         }
525         dfq->deferred_calls = NULL;
526
527         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
528
529         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
530
531         /* if the fetch havent completed in 30 seconds, just tear it all down
532            and let it try again as the events are reissued */
533         event_add_timed(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0), dfq_timeout, dfq);
534
535         talloc_free(k);
536         return 0;
537 }
538
539 /* check if this is a duplicate request to a fetch already in-flight
540    if it is, make this call deferred to be reprocessed later when
541    the in-flight fetch completes.
542 */
543 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call *c)
544 {
545         uint32_t *k;
546         struct ctdb_deferred_fetch_queue *dfq;
547         struct ctdb_deferred_fetch_call *dfc;
548
549         k = talloc_zero_size(c, ((key.dsize + 3) & 0xfffffffc) + 4);
550         if (k == NULL) {
551                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
552                 return -1;
553         }
554
555         k[0] = (key.dsize + 3) / 4 + 1;
556         memcpy(&k[1], key.dptr, key.dsize);
557
558         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
559         if (dfq == NULL) {
560                 talloc_free(k);
561                 return -1;
562         }
563
564
565         talloc_free(k);
566
567         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
568         if (dfc == NULL) {
569                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
570                 return -1;
571         }
572
573         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
574         if (dfc->w == NULL) {
575                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
576                 talloc_free(dfc);
577                 return -1;
578         }
579
580         dfc->c = talloc_steal(dfc, c);
581         dfc->w->ctdb = ctdb_db->ctdb;
582         dfc->w->client_id = client->client_id;
583
584         DLIST_ADD_END(dfq->deferred_calls, dfc, NULL);
585
586         return 0;
587 }
588
589
590 /*
591   this is called when the ctdb daemon received a ctdb request call
592   from a local client over the unix domain socket
593  */
594 static void daemon_request_call_from_client(struct ctdb_client *client, 
595                                             struct ctdb_req_call *c)
596 {
597         struct ctdb_call_state *state;
598         struct ctdb_db_context *ctdb_db;
599         struct daemon_call_state *dstate;
600         struct ctdb_call *call;
601         struct ctdb_ltdb_header header;
602         TDB_DATA key, data;
603         int ret;
604         struct ctdb_context *ctdb = client->ctdb;
605         struct ctdb_daemon_packet_wrap *w;
606
607         CTDB_INCREMENT_STAT(ctdb, total_calls);
608         CTDB_DECREMENT_STAT(ctdb, pending_calls);
609
610         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
611         if (!ctdb_db) {
612                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
613                           c->db_id));
614                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
615                 return;
616         }
617
618         if (ctdb_db->unhealthy_reason) {
619                 /*
620                  * this is just a warning, as the tdb should be empty anyway,
621                  * and only persistent databases can be unhealthy, which doesn't
622                  * use this code patch
623                  */
624                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
625                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
626         }
627
628         key.dptr = c->data;
629         key.dsize = c->keylen;
630
631         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
632         CTDB_NO_MEMORY_VOID(ctdb, w);   
633
634         w->ctdb = ctdb;
635         w->client_id = client->client_id;
636
637         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
638                                            (struct ctdb_req_header *)c, &data,
639                                            daemon_incoming_packet_wrap, w, true);
640         if (ret == -2) {
641                 /* will retry later */
642                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
643                 return;
644         }
645
646         talloc_free(w);
647
648         if (ret != 0) {
649                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
650                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
651                 return;
652         }
653
654
655         /* check if this fetch request is a duplicate for a
656            request we already have in flight. If so defer it until
657            the first request completes.
658         */
659         if (ctdb->tunable.fetch_collapse == 1) {
660                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
661                         ret = ctdb_ltdb_unlock(ctdb_db, key);
662                         if (ret != 0) {
663                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
664                         }
665                         return;
666                 }
667         }
668
669         /* Dont do READONLY if we dont have a tracking database */
670         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
671                 c->flags &= ~CTDB_WANT_READONLY;
672         }
673
674         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
675                 header.flags &= ~CTDB_REC_RO_FLAGS;
676                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
677                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
678                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
679                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
680                 }
681                 /* and clear out the tracking data */
682                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
683                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
684                 }
685         }
686
687         /* if we are revoking, we must defer all other calls until the revoke
688          * had completed.
689          */
690         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
691                 talloc_free(data.dptr);
692                 ret = ctdb_ltdb_unlock(ctdb_db, key);
693
694                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
695                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
696                 }
697                 return;
698         }
699
700         if ((header.dmaster == ctdb->pnn)
701         && (!(c->flags & CTDB_WANT_READONLY))
702         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
703                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
704                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
705                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
706                 }
707                 ret = ctdb_ltdb_unlock(ctdb_db, key);
708
709                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
710                         ctdb_fatal(ctdb, "Failed to start record revoke");
711                 }
712                 talloc_free(data.dptr);
713
714                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
715                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
716                 }
717
718                 return;
719         }               
720
721         dstate = talloc(client, struct daemon_call_state);
722         if (dstate == NULL) {
723                 ret = ctdb_ltdb_unlock(ctdb_db, key);
724                 if (ret != 0) {
725                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
726                 }
727
728                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
729                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730                 return;
731         }
732         dstate->start_time = timeval_current();
733         dstate->client = client;
734         dstate->reqid  = c->hdr.reqid;
735         talloc_steal(dstate, data.dptr);
736
737         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
738         if (call == NULL) {
739                 ret = ctdb_ltdb_unlock(ctdb_db, key);
740                 if (ret != 0) {
741                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
742                 }
743
744                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
745                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
746                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
747                 return;
748         }
749
750         dstate->readonly_fetch = 0;
751         call->call_id = c->callid;
752         call->key = key;
753         call->call_data.dptr = c->data + c->keylen;
754         call->call_data.dsize = c->calldatalen;
755         call->flags = c->flags;
756
757         if (c->flags & CTDB_WANT_READONLY) {
758                 /* client wants readonly record, so translate this into a 
759                    fetch with header. remember what the client asked for
760                    so we can remap the reply back to the proper format for
761                    the client in the reply
762                  */
763                 dstate->client_callid = call->call_id;
764                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
765                 dstate->readonly_fetch = 1;
766         }
767
768         if (header.dmaster == ctdb->pnn) {
769                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
770         } else {
771                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
772                 if (ctdb->tunable.fetch_collapse == 1) {
773                         /* This request triggered a remote fetch-lock.
774                            set up a deferral for this key so any additional
775                            fetch-locks are deferred until the current one
776                            finishes.
777                          */
778                         setup_deferred_fetch_locks(ctdb_db, call);
779                 }
780         }
781
782         ret = ctdb_ltdb_unlock(ctdb_db, key);
783         if (ret != 0) {
784                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
785         }
786
787         if (state == NULL) {
788                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
789                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
790                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
791                 return;
792         }
793         talloc_steal(state, dstate);
794         talloc_steal(client, state);
795
796         state->async.fn = daemon_call_from_client_callback;
797         state->async.private_data = dstate;
798 }
799
800
801 static void daemon_request_control_from_client(struct ctdb_client *client, 
802                                                struct ctdb_req_control *c);
803
804 /* data contains a packet from the client */
805 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
806 {
807         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
808         TALLOC_CTX *tmp_ctx;
809         struct ctdb_context *ctdb = client->ctdb;
810
811         /* place the packet as a child of a tmp_ctx. We then use
812            talloc_free() below to free it. If any of the calls want
813            to keep it, then they will steal it somewhere else, and the
814            talloc_free() will be a no-op */
815         tmp_ctx = talloc_new(client);
816         talloc_steal(tmp_ctx, hdr);
817
818         if (hdr->ctdb_magic != CTDB_MAGIC) {
819                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
820                 goto done;
821         }
822
823         if (hdr->ctdb_version != CTDB_VERSION) {
824                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
825                 goto done;
826         }
827
828         switch (hdr->operation) {
829         case CTDB_REQ_CALL:
830                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
831                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
832                 break;
833
834         case CTDB_REQ_MESSAGE:
835                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
836                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
837                 break;
838
839         case CTDB_REQ_CONTROL:
840                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
841                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
842                 break;
843
844         default:
845                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
846                          hdr->operation));
847         }
848
849 done:
850         talloc_free(tmp_ctx);
851 }
852
853 /*
854   called when the daemon gets a incoming packet
855  */
856 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
857 {
858         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
859         struct ctdb_req_header *hdr;
860
861         if (cnt == 0) {
862                 talloc_free(client);
863                 return;
864         }
865
866         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
867
868         if (cnt < sizeof(*hdr)) {
869                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
870                                (unsigned)cnt);
871                 return;
872         }
873         hdr = (struct ctdb_req_header *)data;
874         if (cnt != hdr->length) {
875                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
876                                (unsigned)hdr->length, (unsigned)cnt);
877                 return;
878         }
879
880         if (hdr->ctdb_magic != CTDB_MAGIC) {
881                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
882                 return;
883         }
884
885         if (hdr->ctdb_version != CTDB_VERSION) {
886                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
887                 return;
888         }
889
890         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
891                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
892                  hdr->srcnode, hdr->destnode));
893
894         /* it is the responsibility of the incoming packet function to free 'data' */
895         daemon_incoming_packet(client, hdr);
896 }
897
898
899 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
900 {
901         if (client_pid->ctdb->client_pids != NULL) {
902                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
903         }
904
905         return 0;
906 }
907
908
909 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
910                          uint16_t flags, void *private_data)
911 {
912         struct sockaddr_un addr;
913         socklen_t len;
914         int fd;
915         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
916         struct ctdb_client *client;
917         struct ctdb_client_pid_list *client_pid;
918         pid_t peer_pid = 0;
919
920         memset(&addr, 0, sizeof(addr));
921         len = sizeof(addr);
922         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
923         if (fd == -1) {
924                 return;
925         }
926
927         set_nonblocking(fd);
928         set_close_on_exec(fd);
929
930         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
931
932         client = talloc_zero(ctdb, struct ctdb_client);
933         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
934                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
935         }
936
937         client->ctdb = ctdb;
938         client->fd = fd;
939         client->client_id = ctdb_reqid_new(ctdb, client);
940         client->pid = peer_pid;
941
942         client_pid = talloc(client, struct ctdb_client_pid_list);
943         if (client_pid == NULL) {
944                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
945                 close(fd);
946                 talloc_free(client);
947                 return;
948         }               
949         client_pid->ctdb   = ctdb;
950         client_pid->pid    = peer_pid;
951         client_pid->client = client;
952
953         DLIST_ADD(ctdb->client_pids, client_pid);
954
955         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
956                                          ctdb_daemon_read_cb, client,
957                                          "client-%u", client->pid);
958
959         talloc_set_destructor(client, ctdb_client_destructor);
960         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
961         ctdb->num_clients++;
962 }
963
964
965
966 /*
967   create a unix domain socket and bind it
968   return a file descriptor open on the socket 
969 */
970 static int ux_socket_bind(struct ctdb_context *ctdb)
971 {
972         struct sockaddr_un addr;
973
974         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
975         if (ctdb->daemon.sd == -1) {
976                 return -1;
977         }
978
979         memset(&addr, 0, sizeof(addr));
980         addr.sun_family = AF_UNIX;
981         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
982
983         /* First check if an old ctdbd might be running */
984         if (connect(ctdb->daemon.sd,
985                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
986                 DEBUG(DEBUG_CRIT,
987                       ("Something is already listening on ctdb socket '%s'\n",
988                        ctdb->daemon.name));
989                 goto failed;
990         }
991
992         /* Remove any old socket */
993         unlink(ctdb->daemon.name);
994
995         set_close_on_exec(ctdb->daemon.sd);
996         set_nonblocking(ctdb->daemon.sd);
997
998         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
999                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1000                 goto failed;
1001         }
1002
1003         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1004             chmod(ctdb->daemon.name, 0700) != 0) {
1005                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1006                 goto failed;
1007         }
1008
1009
1010         if (listen(ctdb->daemon.sd, 100) != 0) {
1011                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1012                 goto failed;
1013         }
1014
1015         return 0;
1016
1017 failed:
1018         close(ctdb->daemon.sd);
1019         ctdb->daemon.sd = -1;
1020         return -1;      
1021 }
1022
1023 static void initialise_node_flags (struct ctdb_context *ctdb)
1024 {
1025         if (ctdb->pnn == -1) {
1026                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1027         }
1028
1029         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1030
1031         /* do we start out in DISABLED mode? */
1032         if (ctdb->start_as_disabled != 0) {
1033                 DEBUG(DEBUG_INFO, ("This node is configured to start in DISABLED state\n"));
1034                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1035         }
1036         /* do we start out in STOPPED mode? */
1037         if (ctdb->start_as_stopped != 0) {
1038                 DEBUG(DEBUG_INFO, ("This node is configured to start in STOPPED state\n"));
1039                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1040         }
1041 }
1042
1043 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1044                                       void *private_data)
1045 {
1046         if (status != 0) {
1047                 ctdb_die(ctdb, "Failed to run setup event");
1048         }
1049         ctdb_run_notification_script(ctdb, "setup");
1050
1051         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_FIRST_RECOVERY);
1052
1053         /* tell all other nodes we've just started up */
1054         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1055                                  0, CTDB_CONTROL_STARTUP, 0,
1056                                  CTDB_CTRL_FLAG_NOREPLY,
1057                                  tdb_null, NULL, NULL);
1058
1059         /* Start the recovery daemon */
1060         if (ctdb_start_recoverd(ctdb) != 0) {
1061                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1062                 exit(11);
1063         }
1064
1065         ctdb_start_periodic_events(ctdb);
1066 }
1067
1068 static struct timeval tevent_before_wait_ts;
1069 static struct timeval tevent_after_wait_ts;
1070
1071 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1072                               void *private_data)
1073 {
1074         struct timeval diff;
1075         struct timeval now;
1076
1077         if (getpid() != ctdbd_pid) {
1078                 return;
1079         }
1080
1081         now = timeval_current();
1082
1083         switch (tp) {
1084         case TEVENT_TRACE_BEFORE_WAIT:
1085                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1086                         diff = timeval_until(&tevent_after_wait_ts, &now);
1087                         if (diff.tv_sec > 3) {
1088                                 DEBUG(DEBUG_ERR,
1089                                       ("Handling event took %ld seconds!\n",
1090                                        diff.tv_sec));
1091                         }
1092                 }
1093                 tevent_before_wait_ts = now;
1094                 break;
1095
1096         case TEVENT_TRACE_AFTER_WAIT:
1097                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1098                         diff = timeval_until(&tevent_before_wait_ts, &now);
1099                         if (diff.tv_sec > 3) {
1100                                 DEBUG(DEBUG_CRIT,
1101                                       ("No event for %ld seconds!\n",
1102                                        diff.tv_sec));
1103                         }
1104                 }
1105                 tevent_after_wait_ts = now;
1106                 break;
1107
1108         default:
1109                 /* Do nothing for future tevent trace points */ ;
1110         }
1111 }
1112
1113 static void ctdb_remove_pidfile(void)
1114 {
1115         if (ctdbd_pidfile != NULL && !ctdb_is_child_process()) {
1116                 if (unlink(ctdbd_pidfile) == 0) {
1117                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1118                                              ctdbd_pidfile));
1119                 } else {
1120                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1121                                               ctdbd_pidfile));
1122                 }
1123         }
1124 }
1125
1126 static void ctdb_create_pidfile(pid_t pid)
1127 {
1128         if (ctdbd_pidfile != NULL) {
1129                 FILE *fp;
1130
1131                 fp = fopen(ctdbd_pidfile, "w");
1132                 if (fp == NULL) {
1133                         DEBUG(DEBUG_ALERT,
1134                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1135                         exit(11);
1136                 }
1137
1138                 fprintf(fp, "%d\n", pid);
1139                 fclose(fp);
1140                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1141                 atexit(ctdb_remove_pidfile);
1142         }
1143 }
1144
1145 /*
1146   start the protocol going as a daemon
1147 */
1148 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog)
1149 {
1150         int res, ret = -1;
1151         struct fd_event *fde;
1152         const char *domain_socket_name;
1153
1154         /* create a unix domain stream socket to listen to */
1155         res = ux_socket_bind(ctdb);
1156         if (res!=0) {
1157                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1158                 exit(10);
1159         }
1160
1161         if (do_fork && fork()) {
1162                 return 0;
1163         }
1164
1165         tdb_reopen_all(false);
1166
1167         if (do_fork) {
1168                 setsid();
1169                 close(0);
1170                 if (open("/dev/null", O_RDONLY) != 0) {
1171                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1172                         exit(11);
1173                 }
1174         }
1175         block_signal(SIGPIPE);
1176
1177         ctdbd_pid = getpid();
1178         ctdb->ctdbd_pid = ctdbd_pid;
1179         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1180                           CTDB_VERSION_STRING, ctdbd_pid));
1181         ctdb_create_pidfile(ctdb->ctdbd_pid);
1182
1183         /* Make sure we log something when the daemon terminates.
1184          * This must be the first exit handler to run (so the last to
1185          * be registered.
1186          */
1187         atexit(print_exit_message);
1188
1189         if (ctdb->do_setsched) {
1190                 /* try to set us up as realtime */
1191                 ctdb_set_scheduler(ctdb);
1192         }
1193
1194         /* ensure the socket is deleted on exit of the daemon */
1195         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
1196         if (domain_socket_name == NULL) {
1197                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
1198                 exit(12);
1199         }
1200
1201         ctdb->ev = event_context_init(NULL);
1202         tevent_loop_allow_nesting(ctdb->ev);
1203         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, NULL);
1204         ret = ctdb_init_tevent_logging(ctdb);
1205         if (ret != 0) {
1206                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1207                 exit(1);
1208         }
1209
1210         /* set up a handler to pick up sigchld */
1211         if (ctdb_init_sigchld(ctdb) == NULL) {
1212                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1213                 exit(1);
1214         }
1215
1216         ctdb_set_child_logging(ctdb);
1217         if (use_syslog) {
1218                 if (start_syslog_daemon(ctdb)) {
1219                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
1220                         exit(10);
1221                 }
1222         }
1223
1224         /* initialize statistics collection */
1225         ctdb_statistics_init(ctdb);
1226
1227         /* force initial recovery for election */
1228         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1229
1230         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1231         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1232         if (ret != 0) {
1233                 ctdb_die(ctdb, "Failed to run init event\n");
1234         }
1235         ctdb_run_notification_script(ctdb, "init");
1236
1237         if (strcmp(ctdb->transport, "tcp") == 0) {
1238                 int ctdb_tcp_init(struct ctdb_context *);
1239                 ret = ctdb_tcp_init(ctdb);
1240         }
1241 #ifdef USE_INFINIBAND
1242         if (strcmp(ctdb->transport, "ib") == 0) {
1243                 int ctdb_ibw_init(struct ctdb_context *);
1244                 ret = ctdb_ibw_init(ctdb);
1245         }
1246 #endif
1247         if (ret != 0) {
1248                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1249                 return -1;
1250         }
1251
1252         if (ctdb->methods == NULL) {
1253                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1254                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1255         }
1256
1257         /* initialise the transport  */
1258         if (ctdb->methods->initialise(ctdb) != 0) {
1259                 ctdb_fatal(ctdb, "transport failed to initialise");
1260         }
1261
1262         initialise_node_flags(ctdb);
1263
1264         if (ctdb->public_addresses_file) {
1265                 ret = ctdb_set_public_addresses(ctdb, true);
1266                 if (ret == -1) {
1267                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1268                         exit(1);
1269                 }
1270                 if (ctdb->do_checkpublicip) {
1271                         ctdb_start_monitoring_interfaces(ctdb);
1272                 }
1273         }
1274
1275
1276         /* attach to existing databases */
1277         if (ctdb_attach_databases(ctdb) != 0) {
1278                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1279         }
1280
1281         /* start frozen, then let the first election sort things out */
1282         if (!ctdb_blocking_freeze(ctdb)) {
1283                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1284         }
1285
1286         /* now start accepting clients, only can do this once frozen */
1287         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
1288                            EVENT_FD_READ,
1289                            ctdb_accept_client, ctdb);
1290         if (fde == NULL) {
1291                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1292         }
1293         tevent_fd_set_auto_close(fde);
1294
1295         /* release any IPs we hold from previous runs of the daemon */
1296         if (ctdb->tunable.disable_ip_failover == 0) {
1297                 ctdb_release_all_ips(ctdb);
1298         }
1299
1300         /* Start the transport */
1301         if (ctdb->methods->start(ctdb) != 0) {
1302                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1303                 ctdb_fatal(ctdb, "transport failed to start");
1304         }
1305
1306         /* Recovery daemon and timed events are started from the
1307          * callback, only after the setup event completes
1308          * successfully.
1309          */
1310         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1311         ret = ctdb_event_script_callback(ctdb,
1312                                          ctdb,
1313                                          ctdb_setup_event_callback,
1314                                          ctdb,
1315                                          false,
1316                                          CTDB_EVENT_SETUP,
1317                                          "%s",
1318                                          "");
1319         if (ret != 0) {
1320                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1321                 exit(1);
1322         }
1323
1324         ctdb_lockdown_memory(ctdb);
1325           
1326         /* go into a wait loop to allow other nodes to complete */
1327         event_loop_wait(ctdb->ev);
1328
1329         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1330         exit(1);
1331 }
1332
1333 /*
1334   allocate a packet for use in daemon<->daemon communication
1335  */
1336 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1337                                                  TALLOC_CTX *mem_ctx, 
1338                                                  enum ctdb_operation operation, 
1339                                                  size_t length, size_t slength,
1340                                                  const char *type)
1341 {
1342         int size;
1343         struct ctdb_req_header *hdr;
1344
1345         length = MAX(length, slength);
1346         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1347
1348         if (ctdb->methods == NULL) {
1349                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1350                          operation, (unsigned)length));
1351                 return NULL;
1352         }
1353
1354         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1355         if (hdr == NULL) {
1356                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1357                          operation, (unsigned)length));
1358                 return NULL;
1359         }
1360         talloc_set_name_const(hdr, type);
1361         memset(hdr, 0, slength);
1362         hdr->length       = length;
1363         hdr->operation    = operation;
1364         hdr->ctdb_magic   = CTDB_MAGIC;
1365         hdr->ctdb_version = CTDB_VERSION;
1366         hdr->generation   = ctdb->vnn_map->generation;
1367         hdr->srcnode      = ctdb->pnn;
1368
1369         return hdr;     
1370 }
1371
1372 struct daemon_control_state {
1373         struct daemon_control_state *next, *prev;
1374         struct ctdb_client *client;
1375         struct ctdb_req_control *c;
1376         uint32_t reqid;
1377         struct ctdb_node *node;
1378 };
1379
1380 /*
1381   callback when a control reply comes in
1382  */
1383 static void daemon_control_callback(struct ctdb_context *ctdb,
1384                                     int32_t status, TDB_DATA data, 
1385                                     const char *errormsg,
1386                                     void *private_data)
1387 {
1388         struct daemon_control_state *state = talloc_get_type(private_data, 
1389                                                              struct daemon_control_state);
1390         struct ctdb_client *client = state->client;
1391         struct ctdb_reply_control *r;
1392         size_t len;
1393         int ret;
1394
1395         /* construct a message to send to the client containing the data */
1396         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1397         if (errormsg) {
1398                 len += strlen(errormsg);
1399         }
1400         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1401                                struct ctdb_reply_control);
1402         CTDB_NO_MEMORY_VOID(ctdb, r);
1403
1404         r->hdr.reqid     = state->reqid;
1405         r->status        = status;
1406         r->datalen       = data.dsize;
1407         r->errorlen = 0;
1408         memcpy(&r->data[0], data.dptr, data.dsize);
1409         if (errormsg) {
1410                 r->errorlen = strlen(errormsg);
1411                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1412         }
1413
1414         ret = daemon_queue_send(client, &r->hdr);
1415         if (ret != -1) {
1416                 talloc_free(state);
1417         }
1418 }
1419
1420 /*
1421   fail all pending controls to a disconnected node
1422  */
1423 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1424 {
1425         struct daemon_control_state *state;
1426         while ((state = node->pending_controls)) {
1427                 DLIST_REMOVE(node->pending_controls, state);
1428                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1429                                         "node is disconnected", state);
1430         }
1431 }
1432
1433 /*
1434   destroy a daemon_control_state
1435  */
1436 static int daemon_control_destructor(struct daemon_control_state *state)
1437 {
1438         if (state->node) {
1439                 DLIST_REMOVE(state->node->pending_controls, state);
1440         }
1441         return 0;
1442 }
1443
1444 /*
1445   this is called when the ctdb daemon received a ctdb request control
1446   from a local client over the unix domain socket
1447  */
1448 static void daemon_request_control_from_client(struct ctdb_client *client, 
1449                                                struct ctdb_req_control *c)
1450 {
1451         TDB_DATA data;
1452         int res;
1453         struct daemon_control_state *state;
1454         TALLOC_CTX *tmp_ctx = talloc_new(client);
1455
1456         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1457                 c->hdr.destnode = client->ctdb->pnn;
1458         }
1459
1460         state = talloc(client, struct daemon_control_state);
1461         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1462
1463         state->client = client;
1464         state->c = talloc_steal(state, c);
1465         state->reqid = c->hdr.reqid;
1466         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1467                 state->node = client->ctdb->nodes[c->hdr.destnode];
1468                 DLIST_ADD(state->node->pending_controls, state);
1469         } else {
1470                 state->node = NULL;
1471         }
1472
1473         talloc_set_destructor(state, daemon_control_destructor);
1474
1475         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1476                 talloc_steal(tmp_ctx, state);
1477         }
1478         
1479         data.dptr = &c->data[0];
1480         data.dsize = c->datalen;
1481         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1482                                        c->srvid, c->opcode, client->client_id,
1483                                        c->flags,
1484                                        data, daemon_control_callback,
1485                                        state);
1486         if (res != 0) {
1487                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1488                          c->hdr.destnode));
1489         }
1490
1491         talloc_free(tmp_ctx);
1492 }
1493
1494 /*
1495   register a call function
1496 */
1497 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1498                          ctdb_fn_t fn, int id)
1499 {
1500         struct ctdb_registered_call *call;
1501         struct ctdb_db_context *ctdb_db;
1502
1503         ctdb_db = find_ctdb_db(ctdb, db_id);
1504         if (ctdb_db == NULL) {
1505                 return -1;
1506         }
1507
1508         call = talloc(ctdb_db, struct ctdb_registered_call);
1509         call->fn = fn;
1510         call->id = id;
1511
1512         DLIST_ADD(ctdb_db->calls, call);        
1513         return 0;
1514 }
1515
1516
1517
1518 /*
1519   this local messaging handler is ugly, but is needed to prevent
1520   recursion in ctdb_send_message() when the destination node is the
1521   same as the source node
1522  */
1523 struct ctdb_local_message {
1524         struct ctdb_context *ctdb;
1525         uint64_t srvid;
1526         TDB_DATA data;
1527 };
1528
1529 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1530                                        struct timeval t, void *private_data)
1531 {
1532         struct ctdb_local_message *m = talloc_get_type(private_data, 
1533                                                        struct ctdb_local_message);
1534         int res;
1535
1536         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1537         if (res != 0) {
1538                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1539                           (unsigned long long)m->srvid));
1540         }
1541         talloc_free(m);
1542 }
1543
1544 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1545 {
1546         struct ctdb_local_message *m;
1547         m = talloc(ctdb, struct ctdb_local_message);
1548         CTDB_NO_MEMORY(ctdb, m);
1549
1550         m->ctdb = ctdb;
1551         m->srvid = srvid;
1552         m->data  = data;
1553         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1554         if (m->data.dptr == NULL) {
1555                 talloc_free(m);
1556                 return -1;
1557         }
1558
1559         /* this needs to be done as an event to prevent recursion */
1560         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1561         return 0;
1562 }
1563
1564 /*
1565   send a ctdb message
1566 */
1567 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1568                              uint64_t srvid, TDB_DATA data)
1569 {
1570         struct ctdb_req_message *r;
1571         int len;
1572
1573         if (ctdb->methods == NULL) {
1574                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1575                 return -1;
1576         }
1577
1578         /* see if this is a message to ourselves */
1579         if (pnn == ctdb->pnn) {
1580                 return ctdb_local_message(ctdb, srvid, data);
1581         }
1582
1583         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1584         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1585                                     struct ctdb_req_message);
1586         CTDB_NO_MEMORY(ctdb, r);
1587
1588         r->hdr.destnode  = pnn;
1589         r->srvid         = srvid;
1590         r->datalen       = data.dsize;
1591         memcpy(&r->data[0], data.dptr, data.dsize);
1592
1593         ctdb_queue_packet(ctdb, &r->hdr);
1594
1595         talloc_free(r);
1596         return 0;
1597 }
1598
1599
1600
1601 struct ctdb_client_notify_list {
1602         struct ctdb_client_notify_list *next, *prev;
1603         struct ctdb_context *ctdb;
1604         uint64_t srvid;
1605         TDB_DATA data;
1606 };
1607
1608
1609 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1610 {
1611         int ret;
1612
1613         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1614
1615         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1616         if (ret != 0) {
1617                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1618         }
1619
1620         return 0;
1621 }
1622
1623 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1624 {
1625         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1626         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1627         struct ctdb_client_notify_list *nl;
1628
1629         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1630
1631         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1632                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1633                 return -1;
1634         }
1635
1636         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1637                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1638                 return -1;
1639         }
1640
1641
1642         if (client == NULL) {
1643                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1644                 return -1;
1645         }
1646
1647         for(nl=client->notify; nl; nl=nl->next) {
1648                 if (nl->srvid == notify->srvid) {
1649                         break;
1650                 }
1651         }
1652         if (nl != NULL) {
1653                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1654                 return -1;
1655         }
1656
1657         nl = talloc(client, struct ctdb_client_notify_list);
1658         CTDB_NO_MEMORY(ctdb, nl);
1659         nl->ctdb       = ctdb;
1660         nl->srvid      = notify->srvid;
1661         nl->data.dsize = notify->len;
1662         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1663         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1664         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1665         
1666         DLIST_ADD(client->notify, nl);
1667         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1668
1669         return 0;
1670 }
1671
1672 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1673 {
1674         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1675         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1676         struct ctdb_client_notify_list *nl;
1677
1678         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1679
1680         if (client == NULL) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1682                 return -1;
1683         }
1684
1685         for(nl=client->notify; nl; nl=nl->next) {
1686                 if (nl->srvid == notify->srvid) {
1687                         break;
1688                 }
1689         }
1690         if (nl == NULL) {
1691                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1692                 return -1;
1693         }
1694
1695         DLIST_REMOVE(client->notify, nl);
1696         talloc_set_destructor(nl, NULL);
1697         talloc_free(nl);
1698
1699         return 0;
1700 }
1701
1702 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1703 {
1704         struct ctdb_client_pid_list *client_pid;
1705
1706         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1707                 if (client_pid->pid == pid) {
1708                         return client_pid->client;
1709                 }
1710         }
1711         return NULL;
1712 }
1713
1714
1715 /* This control is used by samba when probing if a process (of a samba daemon)
1716    exists on the node.
1717    Samba does this when it needs/wants to check if a subrecord in one of the
1718    databases is still valied, or if it is stale and can be removed.
1719    If the node is in unhealthy or stopped state we just kill of the samba
1720    process holding htis sub-record and return to the calling samba that
1721    the process does not exist.
1722    This allows us to forcefully recall subrecords registered by samba processes
1723    on banned and stopped nodes.
1724 */
1725 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1726 {
1727         struct ctdb_client *client;
1728
1729         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1730                 client = ctdb_find_client_by_pid(ctdb, pid);
1731                 if (client != NULL) {
1732                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1733                         talloc_free(client);
1734                 }
1735                 return -1;
1736         }
1737
1738         return kill(pid, 0);
1739 }
1740
1741 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1742 {
1743         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1744                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1745                 return;
1746         }
1747
1748         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1749         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1750         ctdb_stop_recoverd(ctdb);
1751         ctdb_stop_keepalive(ctdb);
1752         ctdb_stop_monitoring(ctdb);
1753         ctdb_release_all_ips(ctdb);
1754         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1755         if (ctdb->methods != NULL) {
1756                 ctdb->methods->shutdown(ctdb);
1757         }
1758
1759         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1760         exit(exit_code);
1761 }