dlist: remove unneeded type argument from DLIST_ADD_END()
[samba.git] / ctdb / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/wait.h"
24 #include "system/time.h"
25
26 #include <talloc.h>
27 /* Allow use of deprecated function tevent_loop_allow_nesting() */
28 #define TEVENT_DEPRECATED
29 #include <tevent.h>
30 #include <tdb.h>
31
32 #include "lib/tdb_wrap/tdb_wrap.h"
33 #include "lib/util/dlinklist.h"
34 #include "lib/util/debug.h"
35 #include "lib/util/samba_util.h"
36
37 #include "ctdb_version.h"
38 #include "ctdb_private.h"
39 #include "ctdb_client.h"
40
41 #include "common/rb_tree.h"
42 #include "common/reqid.h"
43 #include "common/system.h"
44 #include "common/common.h"
45 #include "common/logging.h"
46
47 struct ctdb_client_pid_list {
48         struct ctdb_client_pid_list *next, *prev;
49         struct ctdb_context *ctdb;
50         pid_t pid;
51         struct ctdb_client *client;
52 };
53
54 const char *ctdbd_pidfile = NULL;
55
56 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
57
58 static void print_exit_message(void)
59 {
60         if (debug_extra != NULL && debug_extra[0] != '\0') {
61                 DEBUG(DEBUG_NOTICE,("CTDB %s shutting down\n", debug_extra));
62         } else {
63                 DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
64
65                 /* Wait a second to allow pending log messages to be flushed */
66                 sleep(1);
67         }
68 }
69
70
71
72 static void ctdb_time_tick(struct tevent_context *ev, struct tevent_timer *te,
73                                   struct timeval t, void *private_data)
74 {
75         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
76
77         if (getpid() != ctdb->ctdbd_pid) {
78                 return;
79         }
80
81         tevent_add_timer(ctdb->ev, ctdb,
82                          timeval_current_ofs(1, 0),
83                          ctdb_time_tick, ctdb);
84 }
85
86 /* Used to trigger a dummy event once per second, to make
87  * detection of hangs more reliable.
88  */
89 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
90 {
91         tevent_add_timer(ctdb->ev, ctdb,
92                          timeval_current_ofs(1, 0),
93                          ctdb_time_tick, ctdb);
94 }
95
96 static void ctdb_start_periodic_events(struct ctdb_context *ctdb)
97 {
98         /* start monitoring for connected/disconnected nodes */
99         ctdb_start_keepalive(ctdb);
100
101         /* start periodic update of tcp tickle lists */
102         ctdb_start_tcp_tickle_update(ctdb);
103
104         /* start listening for recovery daemon pings */
105         ctdb_control_recd_ping(ctdb);
106
107         /* start listening to timer ticks */
108         ctdb_start_time_tickd(ctdb);
109 }
110
111 static void ignore_signal(int signum)
112 {
113         struct sigaction act;
114
115         memset(&act, 0, sizeof(act));
116
117         act.sa_handler = SIG_IGN;
118         sigemptyset(&act.sa_mask);
119         sigaddset(&act.sa_mask, signum);
120         sigaction(signum, &act, NULL);
121 }
122
123
124 /*
125   send a packet to a client
126  */
127 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
128 {
129         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
130         if (hdr->operation == CTDB_REQ_MESSAGE) {
131                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
132                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
133                         talloc_free(client);
134                         return -1;
135                 }
136         }
137         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
138 }
139
140 /*
141   message handler for when we are in daemon mode. This redirects the message
142   to the right client
143  */
144 static void daemon_message_handler(uint64_t srvid, TDB_DATA data,
145                                    void *private_data)
146 {
147         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
148         struct ctdb_req_message_old *r;
149         int len;
150
151         /* construct a message to send to the client containing the data */
152         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
153         r = ctdbd_allocate_pkt(client->ctdb, client->ctdb, CTDB_REQ_MESSAGE,
154                                len, struct ctdb_req_message_old);
155         CTDB_NO_MEMORY_VOID(client->ctdb, r);
156
157         talloc_set_name_const(r, "req_message packet");
158
159         r->srvid         = srvid;
160         r->datalen       = data.dsize;
161         memcpy(&r->data[0], data.dptr, data.dsize);
162
163         daemon_queue_send(client, &r->hdr);
164
165         talloc_free(r);
166 }
167
168 /*
169   this is called when the ctdb daemon received a ctdb request to 
170   set the srvid from the client
171  */
172 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
173 {
174         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
175         int res;
176         if (client == NULL) {
177                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
178                 return -1;
179         }
180         res = srvid_register(ctdb->srv, client, srvid, daemon_message_handler,
181                              client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return srvid_deregister(ctdb->srv, srvid, client);
205 }
206
207 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
208                         TDB_DATA *outdata)
209 {
210         uint64_t *ids;
211         int i, num_ids;
212         uint8_t *results;
213
214         if ((indata.dsize % sizeof(uint64_t)) != 0) {
215                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
216                                   "size=%d\n", (int)indata.dsize));
217                 return -1;
218         }
219
220         ids = (uint64_t *)indata.dptr;
221         num_ids = indata.dsize / 8;
222
223         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
224         if (results == NULL) {
225                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
226                 return -1;
227         }
228         for (i=0; i<num_ids; i++) {
229                 if (srvid_exists(ctdb->srv, ids[i]) == 0) {
230                         results[i/8] |= (1 << (i%8));
231                 }
232         }
233         outdata->dptr = (uint8_t *)results;
234         outdata->dsize = talloc_get_size(results);
235         return 0;
236 }
237
238 /*
239   destroy a ctdb_client
240 */
241 static int ctdb_client_destructor(struct ctdb_client *client)
242 {
243         struct ctdb_db_context *ctdb_db;
244
245         ctdb_takeover_client_destructor_hook(client);
246         reqid_remove(client->ctdb->idr, client->client_id);
247         client->ctdb->num_clients--;
248
249         if (client->num_persistent_updates != 0) {
250                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
251                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
252         }
253         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
254         if (ctdb_db) {
255                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
256                                   "commit active. Forcing recovery.\n"));
257                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
258
259                 /*
260                  * trans3 transaction state:
261                  *
262                  * The destructor sets the pointer to NULL.
263                  */
264                 talloc_free(ctdb_db->persistent_state);
265         }
266
267         return 0;
268 }
269
270
271 /*
272   this is called when the ctdb daemon received a ctdb request message
273   from a local client over the unix domain socket
274  */
275 static void daemon_request_message_from_client(struct ctdb_client *client, 
276                                                struct ctdb_req_message_old *c)
277 {
278         TDB_DATA data;
279         int res;
280
281         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
282                 c->hdr.destnode = ctdb_get_pnn(client->ctdb);
283         }
284
285         /* maybe the message is for another client on this node */
286         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
287                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
288                 return;
289         }
290
291         /* its for a remote node */
292         data.dptr = &c->data[0];
293         data.dsize = c->datalen;
294         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
295                                        c->srvid, data);
296         if (res != 0) {
297                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
298                          c->hdr.destnode));
299         }
300 }
301
302
303 struct daemon_call_state {
304         struct ctdb_client *client;
305         uint32_t reqid;
306         struct ctdb_call *call;
307         struct timeval start_time;
308
309         /* readonly request ? */
310         uint32_t readonly_fetch;
311         uint32_t client_callid;
312 };
313
314 /* 
315    complete a call from a client 
316 */
317 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
318 {
319         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
320                                                            struct daemon_call_state);
321         struct ctdb_reply_call_old *r;
322         int res;
323         uint32_t length;
324         struct ctdb_client *client = dstate->client;
325         struct ctdb_db_context *ctdb_db = state->ctdb_db;
326
327         talloc_steal(client, dstate);
328         talloc_steal(dstate, dstate->call);
329
330         res = ctdb_daemon_call_recv(state, dstate->call);
331         if (res != 0) {
332                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
333                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
334
335                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
336                 return;
337         }
338
339         length = offsetof(struct ctdb_reply_call_old, data) + dstate->call->reply_data.dsize;
340         /* If the client asked for readonly FETCH, we remapped this to 
341            FETCH_WITH_HEADER when calling the daemon. So we must
342            strip the extra header off the reply data before passing
343            it back to the client.
344         */
345         if (dstate->readonly_fetch
346         && dstate->client_callid == CTDB_FETCH_FUNC) {
347                 length -= sizeof(struct ctdb_ltdb_header);
348         }
349
350         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
351                                length, struct ctdb_reply_call_old);
352         if (r == NULL) {
353                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
354                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
355                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
356                 return;
357         }
358         r->hdr.reqid        = dstate->reqid;
359         r->status           = dstate->call->status;
360
361         if (dstate->readonly_fetch
362         && dstate->client_callid == CTDB_FETCH_FUNC) {
363                 /* client only asked for a FETCH so we must strip off
364                    the extra ctdb_ltdb header
365                 */
366                 r->datalen          = dstate->call->reply_data.dsize - sizeof(struct ctdb_ltdb_header);
367                 memcpy(&r->data[0], dstate->call->reply_data.dptr + sizeof(struct ctdb_ltdb_header), r->datalen);
368         } else {
369                 r->datalen          = dstate->call->reply_data.dsize;
370                 memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
371         }
372
373         res = daemon_queue_send(client, &r->hdr);
374         if (res == -1) {
375                 /* client is dead - return immediately */
376                 return;
377         }
378         if (res != 0) {
379                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
380         }
381         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
382         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
383         talloc_free(dstate);
384 }
385
386 struct ctdb_daemon_packet_wrap {
387         struct ctdb_context *ctdb;
388         uint32_t client_id;
389 };
390
391 /*
392   a wrapper to catch disconnected clients
393  */
394 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
395 {
396         struct ctdb_client *client;
397         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
398                                                             struct ctdb_daemon_packet_wrap);
399         if (w == NULL) {
400                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
401                 return;
402         }
403
404         client = reqid_find(w->ctdb->idr, w->client_id, struct ctdb_client);
405         if (client == NULL) {
406                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
407                          w->client_id));
408                 talloc_free(w);
409                 return;
410         }
411         talloc_free(w);
412
413         /* process it */
414         daemon_incoming_packet(client, hdr);    
415 }
416
417 struct ctdb_deferred_fetch_call {
418         struct ctdb_deferred_fetch_call *next, *prev;
419         struct ctdb_req_call_old *c;
420         struct ctdb_daemon_packet_wrap *w;
421 };
422
423 struct ctdb_deferred_fetch_queue {
424         struct ctdb_deferred_fetch_call *deferred_calls;
425 };
426
427 struct ctdb_deferred_requeue {
428         struct ctdb_deferred_fetch_call *dfc;
429         struct ctdb_client *client;
430 };
431
432 /* called from a timer event and starts reprocessing the deferred call.*/
433 static void reprocess_deferred_call(struct tevent_context *ev,
434                                     struct tevent_timer *te,
435                                     struct timeval t, void *private_data)
436 {
437         struct ctdb_deferred_requeue *dfr = (struct ctdb_deferred_requeue *)private_data;
438         struct ctdb_client *client = dfr->client;
439
440         talloc_steal(client, dfr->dfc->c);
441         daemon_incoming_packet(client, (struct ctdb_req_header *)dfr->dfc->c);
442         talloc_free(dfr);
443 }
444
445 /* the referral context is destroyed either after a timeout or when the initial
446    fetch-lock has finished.
447    at this stage, immediately start reprocessing the queued up deferred
448    calls so they get reprocessed immediately (and since we are dmaster at
449    this stage, trigger the waiting smbd processes to pick up and aquire the
450    record right away.
451 */
452 static int deferred_fetch_queue_destructor(struct ctdb_deferred_fetch_queue *dfq)
453 {
454
455         /* need to reprocess the packets from the queue explicitely instead of
456            just using a normal destructor since we want, need, to
457            call the clients in the same oder as the requests queued up
458         */
459         while (dfq->deferred_calls != NULL) {
460                 struct ctdb_client *client;
461                 struct ctdb_deferred_fetch_call *dfc = dfq->deferred_calls;
462                 struct ctdb_deferred_requeue *dfr;
463
464                 DLIST_REMOVE(dfq->deferred_calls, dfc);
465
466                 client = reqid_find(dfc->w->ctdb->idr, dfc->w->client_id, struct ctdb_client);
467                 if (client == NULL) {
468                         DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
469                                  dfc->w->client_id));
470                         continue;
471                 }
472
473                 /* process it by pushing it back onto the eventloop */
474                 dfr = talloc(client, struct ctdb_deferred_requeue);
475                 if (dfr == NULL) {
476                         DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch requeue structure\n"));
477                         continue;
478                 }
479
480                 dfr->dfc    = talloc_steal(dfr, dfc);
481                 dfr->client = client;
482
483                 tevent_add_timer(dfc->w->ctdb->ev, client, timeval_zero(),
484                                  reprocess_deferred_call, dfr);
485         }
486
487         return 0;
488 }
489
490 /* insert the new deferral context into the rb tree.
491    there should never be a pre-existing context here, but check for it
492    warn and destroy the previous context if there is already a deferral context
493    for this key.
494 */
495 static void *insert_dfq_callback(void *parm, void *data)
496 {
497         if (data) {
498                 DEBUG(DEBUG_ERR,("Already have DFQ registered. Free old %p and create new %p\n", data, parm));
499                 talloc_free(data);
500         }
501         return parm;
502 }
503
504 /* if the original fetch-lock did not complete within a reasonable time,
505    free the context and context for all deferred requests to cause them to be
506    re-inserted into the event system.
507 */
508 static void dfq_timeout(struct tevent_context *ev, struct tevent_timer *te,
509                         struct timeval t, void *private_data)
510 {
511         talloc_free(private_data);
512 }
513
514 /* This function is used in the local daemon to register a KEY in a database
515    for being "fetched"
516    While the remote fetch is in-flight, any futher attempts to re-fetch the
517    same record will be deferred until the fetch completes.
518 */
519 static int setup_deferred_fetch_locks(struct ctdb_db_context *ctdb_db, struct ctdb_call *call)
520 {
521         uint32_t *k;
522         struct ctdb_deferred_fetch_queue *dfq;
523
524         k = ctdb_key_to_idkey(call, call->key);
525         if (k == NULL) {
526                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
527                 return -1;
528         }
529
530         dfq  = talloc(call, struct ctdb_deferred_fetch_queue);
531         if (dfq == NULL) {
532                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch queue structure\n"));
533                 talloc_free(k);
534                 return -1;
535         }
536         dfq->deferred_calls = NULL;
537
538         trbt_insertarray32_callback(ctdb_db->deferred_fetch, k[0], &k[0], insert_dfq_callback, dfq);
539
540         talloc_set_destructor(dfq, deferred_fetch_queue_destructor);
541
542         /* if the fetch havent completed in 30 seconds, just tear it all down
543            and let it try again as the events are reissued */
544         tevent_add_timer(ctdb_db->ctdb->ev, dfq, timeval_current_ofs(30, 0),
545                          dfq_timeout, dfq);
546
547         talloc_free(k);
548         return 0;
549 }
550
551 /* check if this is a duplicate request to a fetch already in-flight
552    if it is, make this call deferred to be reprocessed later when
553    the in-flight fetch completes.
554 */
555 static int requeue_duplicate_fetch(struct ctdb_db_context *ctdb_db, struct ctdb_client *client, TDB_DATA key, struct ctdb_req_call_old *c)
556 {
557         uint32_t *k;
558         struct ctdb_deferred_fetch_queue *dfq;
559         struct ctdb_deferred_fetch_call *dfc;
560
561         k = ctdb_key_to_idkey(c, key);
562         if (k == NULL) {
563                 DEBUG(DEBUG_ERR,("Failed to allocate key for deferred fetch\n"));
564                 return -1;
565         }
566
567         dfq = trbt_lookuparray32(ctdb_db->deferred_fetch, k[0], &k[0]);
568         if (dfq == NULL) {
569                 talloc_free(k);
570                 return -1;
571         }
572
573
574         talloc_free(k);
575
576         dfc = talloc(dfq, struct ctdb_deferred_fetch_call);
577         if (dfc == NULL) {
578                 DEBUG(DEBUG_ERR, ("Failed to allocate deferred fetch call structure\n"));
579                 return -1;
580         }
581
582         dfc->w = talloc(dfc, struct ctdb_daemon_packet_wrap);
583         if (dfc->w == NULL) {
584                 DEBUG(DEBUG_ERR,("Failed to allocate deferred fetch daemon packet wrap structure\n"));
585                 talloc_free(dfc);
586                 return -1;
587         }
588
589         dfc->c = talloc_steal(dfc, c);
590         dfc->w->ctdb = ctdb_db->ctdb;
591         dfc->w->client_id = client->client_id;
592
593         DLIST_ADD_END(dfq->deferred_calls, dfc);
594
595         return 0;
596 }
597
598
599 /*
600   this is called when the ctdb daemon received a ctdb request call
601   from a local client over the unix domain socket
602  */
603 static void daemon_request_call_from_client(struct ctdb_client *client, 
604                                             struct ctdb_req_call_old *c)
605 {
606         struct ctdb_call_state *state;
607         struct ctdb_db_context *ctdb_db;
608         struct daemon_call_state *dstate;
609         struct ctdb_call *call;
610         struct ctdb_ltdb_header header;
611         TDB_DATA key, data;
612         int ret;
613         struct ctdb_context *ctdb = client->ctdb;
614         struct ctdb_daemon_packet_wrap *w;
615
616         CTDB_INCREMENT_STAT(ctdb, total_calls);
617         CTDB_INCREMENT_STAT(ctdb, pending_calls);
618
619         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
620         if (!ctdb_db) {
621                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
622                           c->db_id));
623                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
624                 return;
625         }
626
627         if (ctdb_db->unhealthy_reason) {
628                 /*
629                  * this is just a warning, as the tdb should be empty anyway,
630                  * and only persistent databases can be unhealthy, which doesn't
631                  * use this code patch
632                  */
633                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
634                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
635         }
636
637         key.dptr = c->data;
638         key.dsize = c->keylen;
639
640         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
641         CTDB_NO_MEMORY_VOID(ctdb, w);   
642
643         w->ctdb = ctdb;
644         w->client_id = client->client_id;
645
646         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
647                                            (struct ctdb_req_header *)c, &data,
648                                            daemon_incoming_packet_wrap, w, true);
649         if (ret == -2) {
650                 /* will retry later */
651                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
652                 return;
653         }
654
655         talloc_free(w);
656
657         if (ret != 0) {
658                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
659                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
660                 return;
661         }
662
663
664         /* check if this fetch request is a duplicate for a
665            request we already have in flight. If so defer it until
666            the first request completes.
667         */
668         if (ctdb->tunable.fetch_collapse == 1) {
669                 if (requeue_duplicate_fetch(ctdb_db, client, key, c) == 0) {
670                         ret = ctdb_ltdb_unlock(ctdb_db, key);
671                         if (ret != 0) {
672                                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
673                         }
674                         CTDB_DECREMENT_STAT(ctdb, pending_calls);
675                         return;
676                 }
677         }
678
679         /* Dont do READONLY if we don't have a tracking database */
680         if ((c->flags & CTDB_WANT_READONLY) && !ctdb_db->readonly) {
681                 c->flags &= ~CTDB_WANT_READONLY;
682         }
683
684         if (header.flags & CTDB_REC_RO_REVOKE_COMPLETE) {
685                 header.flags &= ~CTDB_REC_RO_FLAGS;
686                 CTDB_INCREMENT_STAT(ctdb, total_ro_revokes);
687                 CTDB_INCREMENT_DB_STAT(ctdb_db, db_ro_revokes);
688                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
689                         ctdb_fatal(ctdb, "Failed to write header with cleared REVOKE flag");
690                 }
691                 /* and clear out the tracking data */
692                 if (tdb_delete(ctdb_db->rottdb, key) != 0) {
693                         DEBUG(DEBUG_ERR,(__location__ " Failed to clear out trackingdb record\n"));
694                 }
695         }
696
697         /* if we are revoking, we must defer all other calls until the revoke
698          * had completed.
699          */
700         if (header.flags & CTDB_REC_RO_REVOKING_READONLY) {
701                 talloc_free(data.dptr);
702                 ret = ctdb_ltdb_unlock(ctdb_db, key);
703
704                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
705                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
706                 }
707                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
708                 return;
709         }
710
711         if ((header.dmaster == ctdb->pnn)
712         && (!(c->flags & CTDB_WANT_READONLY))
713         && (header.flags & (CTDB_REC_RO_HAVE_DELEGATIONS|CTDB_REC_RO_HAVE_READONLY)) ) {
714                 header.flags   |= CTDB_REC_RO_REVOKING_READONLY;
715                 if (ctdb_ltdb_store(ctdb_db, key, &header, data) != 0) {
716                         ctdb_fatal(ctdb, "Failed to store record with HAVE_DELEGATIONS set");
717                 }
718                 ret = ctdb_ltdb_unlock(ctdb_db, key);
719
720                 if (ctdb_start_revoke_ro_record(ctdb, ctdb_db, key, &header, data) != 0) {
721                         ctdb_fatal(ctdb, "Failed to start record revoke");
722                 }
723                 talloc_free(data.dptr);
724
725                 if (ctdb_add_revoke_deferred_call(ctdb, ctdb_db, key, (struct ctdb_req_header *)c, daemon_incoming_packet, client) != 0) {
726                         ctdb_fatal(ctdb, "Failed to add deferred call for revoke child");
727                 }
728
729                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
730                 return;
731         }               
732
733         dstate = talloc(client, struct daemon_call_state);
734         if (dstate == NULL) {
735                 ret = ctdb_ltdb_unlock(ctdb_db, key);
736                 if (ret != 0) {
737                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
738                 }
739
740                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
741                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
742                 return;
743         }
744         dstate->start_time = timeval_current();
745         dstate->client = client;
746         dstate->reqid  = c->hdr.reqid;
747         talloc_steal(dstate, data.dptr);
748
749         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
750         if (call == NULL) {
751                 ret = ctdb_ltdb_unlock(ctdb_db, key);
752                 if (ret != 0) {
753                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
754                 }
755
756                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
757                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
758                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
759                 return;
760         }
761
762         dstate->readonly_fetch = 0;
763         call->call_id = c->callid;
764         call->key = key;
765         call->call_data.dptr = c->data + c->keylen;
766         call->call_data.dsize = c->calldatalen;
767         call->flags = c->flags;
768
769         if (c->flags & CTDB_WANT_READONLY) {
770                 /* client wants readonly record, so translate this into a 
771                    fetch with header. remember what the client asked for
772                    so we can remap the reply back to the proper format for
773                    the client in the reply
774                  */
775                 dstate->client_callid = call->call_id;
776                 call->call_id = CTDB_FETCH_WITH_HEADER_FUNC;
777                 dstate->readonly_fetch = 1;
778         }
779
780         if (header.dmaster == ctdb->pnn) {
781                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
782         } else {
783                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
784                 if (ctdb->tunable.fetch_collapse == 1) {
785                         /* This request triggered a remote fetch-lock.
786                            set up a deferral for this key so any additional
787                            fetch-locks are deferred until the current one
788                            finishes.
789                          */
790                         setup_deferred_fetch_locks(ctdb_db, call);
791                 }
792         }
793
794         ret = ctdb_ltdb_unlock(ctdb_db, key);
795         if (ret != 0) {
796                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
797         }
798
799         if (state == NULL) {
800                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
801                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
802                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
803                 return;
804         }
805         talloc_steal(state, dstate);
806         talloc_steal(client, state);
807
808         state->async.fn = daemon_call_from_client_callback;
809         state->async.private_data = dstate;
810 }
811
812
813 static void daemon_request_control_from_client(struct ctdb_client *client, 
814                                                struct ctdb_req_control_old *c);
815
816 /* data contains a packet from the client */
817 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
818 {
819         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
820         TALLOC_CTX *tmp_ctx;
821         struct ctdb_context *ctdb = client->ctdb;
822
823         /* place the packet as a child of a tmp_ctx. We then use
824            talloc_free() below to free it. If any of the calls want
825            to keep it, then they will steal it somewhere else, and the
826            talloc_free() will be a no-op */
827         tmp_ctx = talloc_new(client);
828         talloc_steal(tmp_ctx, hdr);
829
830         if (hdr->ctdb_magic != CTDB_MAGIC) {
831                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
832                 goto done;
833         }
834
835         if (hdr->ctdb_version != CTDB_PROTOCOL) {
836                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
837                 goto done;
838         }
839
840         switch (hdr->operation) {
841         case CTDB_REQ_CALL:
842                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
843                 daemon_request_call_from_client(client, (struct ctdb_req_call_old *)hdr);
844                 break;
845
846         case CTDB_REQ_MESSAGE:
847                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
848                 daemon_request_message_from_client(client, (struct ctdb_req_message_old *)hdr);
849                 break;
850
851         case CTDB_REQ_CONTROL:
852                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
853                 daemon_request_control_from_client(client, (struct ctdb_req_control_old *)hdr);
854                 break;
855
856         default:
857                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
858                          hdr->operation));
859         }
860
861 done:
862         talloc_free(tmp_ctx);
863 }
864
865 /*
866   called when the daemon gets a incoming packet
867  */
868 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
869 {
870         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
871         struct ctdb_req_header *hdr;
872
873         if (cnt == 0) {
874                 talloc_free(client);
875                 return;
876         }
877
878         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
879
880         if (cnt < sizeof(*hdr)) {
881                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
882                                (unsigned)cnt);
883                 return;
884         }
885         hdr = (struct ctdb_req_header *)data;
886         if (cnt != hdr->length) {
887                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
888                                (unsigned)hdr->length, (unsigned)cnt);
889                 return;
890         }
891
892         if (hdr->ctdb_magic != CTDB_MAGIC) {
893                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
894                 return;
895         }
896
897         if (hdr->ctdb_version != CTDB_PROTOCOL) {
898                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
899                 return;
900         }
901
902         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
903                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
904                  hdr->srcnode, hdr->destnode));
905
906         /* it is the responsibility of the incoming packet function to free 'data' */
907         daemon_incoming_packet(client, hdr);
908 }
909
910
911 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
912 {
913         if (client_pid->ctdb->client_pids != NULL) {
914                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
915         }
916
917         return 0;
918 }
919
920
921 static void ctdb_accept_client(struct tevent_context *ev,
922                                struct tevent_fd *fde, uint16_t flags,
923                                void *private_data)
924 {
925         struct sockaddr_un addr;
926         socklen_t len;
927         int fd;
928         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
929         struct ctdb_client *client;
930         struct ctdb_client_pid_list *client_pid;
931         pid_t peer_pid = 0;
932
933         memset(&addr, 0, sizeof(addr));
934         len = sizeof(addr);
935         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
936         if (fd == -1) {
937                 return;
938         }
939
940         set_nonblocking(fd);
941         set_close_on_exec(fd);
942
943         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
944
945         client = talloc_zero(ctdb, struct ctdb_client);
946         if (ctdb_get_peer_pid(fd, &peer_pid) == 0) {
947                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)peer_pid));
948         }
949
950         client->ctdb = ctdb;
951         client->fd = fd;
952         client->client_id = reqid_new(ctdb->idr, client);
953         client->pid = peer_pid;
954
955         client_pid = talloc(client, struct ctdb_client_pid_list);
956         if (client_pid == NULL) {
957                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
958                 close(fd);
959                 talloc_free(client);
960                 return;
961         }               
962         client_pid->ctdb   = ctdb;
963         client_pid->pid    = peer_pid;
964         client_pid->client = client;
965
966         DLIST_ADD(ctdb->client_pids, client_pid);
967
968         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
969                                          ctdb_daemon_read_cb, client,
970                                          "client-%u", client->pid);
971
972         talloc_set_destructor(client, ctdb_client_destructor);
973         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
974         ctdb->num_clients++;
975 }
976
977
978
979 /*
980   create a unix domain socket and bind it
981   return a file descriptor open on the socket 
982 */
983 static int ux_socket_bind(struct ctdb_context *ctdb)
984 {
985         struct sockaddr_un addr;
986
987         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
988         if (ctdb->daemon.sd == -1) {
989                 return -1;
990         }
991
992         memset(&addr, 0, sizeof(addr));
993         addr.sun_family = AF_UNIX;
994         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path)-1);
995
996         /* First check if an old ctdbd might be running */
997         if (connect(ctdb->daemon.sd,
998                     (struct sockaddr *)&addr, sizeof(addr)) == 0) {
999                 DEBUG(DEBUG_CRIT,
1000                       ("Something is already listening on ctdb socket '%s'\n",
1001                        ctdb->daemon.name));
1002                 goto failed;
1003         }
1004
1005         /* Remove any old socket */
1006         unlink(ctdb->daemon.name);
1007
1008         set_close_on_exec(ctdb->daemon.sd);
1009         set_nonblocking(ctdb->daemon.sd);
1010
1011         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
1012                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
1013                 goto failed;
1014         }
1015
1016         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
1017             chmod(ctdb->daemon.name, 0700) != 0) {
1018                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
1019                 goto failed;
1020         }
1021
1022
1023         if (listen(ctdb->daemon.sd, 100) != 0) {
1024                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
1025                 goto failed;
1026         }
1027
1028         return 0;
1029
1030 failed:
1031         close(ctdb->daemon.sd);
1032         ctdb->daemon.sd = -1;
1033         return -1;      
1034 }
1035
1036 static void initialise_node_flags (struct ctdb_context *ctdb)
1037 {
1038         if (ctdb->pnn == -1) {
1039                 ctdb_fatal(ctdb, "PNN is set to -1 (unknown value)");
1040         }
1041
1042         ctdb->nodes[ctdb->pnn]->flags &= ~NODE_FLAGS_DISCONNECTED;
1043
1044         /* do we start out in DISABLED mode? */
1045         if (ctdb->start_as_disabled != 0) {
1046                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in DISABLED state\n"));
1047                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_DISABLED;
1048         }
1049         /* do we start out in STOPPED mode? */
1050         if (ctdb->start_as_stopped != 0) {
1051                 DEBUG(DEBUG_NOTICE, ("This node is configured to start in STOPPED state\n"));
1052                 ctdb->nodes[ctdb->pnn]->flags |= NODE_FLAGS_STOPPED;
1053         }
1054 }
1055
1056 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
1057                                       void *private_data)
1058 {
1059         if (status != 0) {
1060                 ctdb_die(ctdb, "Failed to run setup event");
1061         }
1062         ctdb_run_notification_script(ctdb, "setup");
1063
1064         /* tell all other nodes we've just started up */
1065         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
1066                                  0, CTDB_CONTROL_STARTUP, 0,
1067                                  CTDB_CTRL_FLAG_NOREPLY,
1068                                  tdb_null, NULL, NULL);
1069
1070         /* Start the recovery daemon */
1071         if (ctdb_start_recoverd(ctdb) != 0) {
1072                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
1073                 exit(11);
1074         }
1075
1076         ctdb_start_periodic_events(ctdb);
1077
1078         ctdb_wait_for_first_recovery(ctdb);
1079 }
1080
1081 static struct timeval tevent_before_wait_ts;
1082 static struct timeval tevent_after_wait_ts;
1083
1084 static void ctdb_tevent_trace(enum tevent_trace_point tp,
1085                               void *private_data)
1086 {
1087         struct timeval diff;
1088         struct timeval now;
1089         struct ctdb_context *ctdb =
1090                 talloc_get_type(private_data, struct ctdb_context);
1091
1092         if (getpid() != ctdb->ctdbd_pid) {
1093                 return;
1094         }
1095
1096         now = timeval_current();
1097
1098         switch (tp) {
1099         case TEVENT_TRACE_BEFORE_WAIT:
1100                 if (!timeval_is_zero(&tevent_after_wait_ts)) {
1101                         diff = timeval_until(&tevent_after_wait_ts, &now);
1102                         if (diff.tv_sec > 3) {
1103                                 DEBUG(DEBUG_ERR,
1104                                       ("Handling event took %ld seconds!\n",
1105                                        (long)diff.tv_sec));
1106                         }
1107                 }
1108                 tevent_before_wait_ts = now;
1109                 break;
1110
1111         case TEVENT_TRACE_AFTER_WAIT:
1112                 if (!timeval_is_zero(&tevent_before_wait_ts)) {
1113                         diff = timeval_until(&tevent_before_wait_ts, &now);
1114                         if (diff.tv_sec > 3) {
1115                                 DEBUG(DEBUG_CRIT,
1116                                       ("No event for %ld seconds!\n",
1117                                        (long)diff.tv_sec));
1118                         }
1119                 }
1120                 tevent_after_wait_ts = now;
1121                 break;
1122
1123         default:
1124                 /* Do nothing for future tevent trace points */ ;
1125         }
1126 }
1127
1128 static void ctdb_remove_pidfile(void)
1129 {
1130         /* Only the main ctdbd's PID matches the SID */
1131         if (ctdbd_pidfile != NULL && getsid(0) == getpid()) {
1132                 if (unlink(ctdbd_pidfile) == 0) {
1133                         DEBUG(DEBUG_NOTICE, ("Removed PID file %s\n",
1134                                              ctdbd_pidfile));
1135                 } else {
1136                         DEBUG(DEBUG_WARNING, ("Failed to Remove PID file %s\n",
1137                                               ctdbd_pidfile));
1138                 }
1139         }
1140 }
1141
1142 static void ctdb_create_pidfile(pid_t pid)
1143 {
1144         if (ctdbd_pidfile != NULL) {
1145                 FILE *fp;
1146
1147                 fp = fopen(ctdbd_pidfile, "w");
1148                 if (fp == NULL) {
1149                         DEBUG(DEBUG_ALERT,
1150                               ("Failed to open PID file %s\n", ctdbd_pidfile));
1151                         exit(11);
1152                 }
1153
1154                 fprintf(fp, "%d\n", pid);
1155                 fclose(fp);
1156                 DEBUG(DEBUG_NOTICE, ("Created PID file %s\n", ctdbd_pidfile));
1157                 atexit(ctdb_remove_pidfile);
1158         }
1159 }
1160
1161 static void ctdb_initialise_vnn_map(struct ctdb_context *ctdb)
1162 {
1163         int i, j, count;
1164
1165         /* initialize the vnn mapping table, skipping any deleted nodes */
1166         ctdb->vnn_map = talloc(ctdb, struct ctdb_vnn_map);
1167         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map);
1168
1169         count = 0;
1170         for (i = 0; i < ctdb->num_nodes; i++) {
1171                 if ((ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) == 0) {
1172                         count++;
1173                 }
1174         }
1175
1176         ctdb->vnn_map->generation = INVALID_GENERATION;
1177         ctdb->vnn_map->size = count;
1178         ctdb->vnn_map->map = talloc_array(ctdb->vnn_map, uint32_t, ctdb->vnn_map->size);
1179         CTDB_NO_MEMORY_FATAL(ctdb, ctdb->vnn_map->map);
1180
1181         for(i=0, j=0; i < ctdb->vnn_map->size; i++) {
1182                 if (ctdb->nodes[i]->flags & NODE_FLAGS_DELETED) {
1183                         continue;
1184                 }
1185                 ctdb->vnn_map->map[j] = i;
1186                 j++;
1187         }
1188 }
1189
1190 static void ctdb_set_my_pnn(struct ctdb_context *ctdb)
1191 {
1192         int nodeid;
1193
1194         if (ctdb->address == NULL) {
1195                 ctdb_fatal(ctdb,
1196                            "Can not determine PNN - node address is not set\n");
1197         }
1198
1199         nodeid = ctdb_ip_to_nodeid(ctdb, ctdb->address);
1200         if (nodeid == -1) {
1201                 ctdb_fatal(ctdb,
1202                            "Can not determine PNN - node address not found in node list\n");
1203         }
1204
1205         ctdb->pnn = ctdb->nodes[nodeid]->pnn;
1206         DEBUG(DEBUG_NOTICE, ("PNN is %u\n", ctdb->pnn));
1207 }
1208
1209 /*
1210   start the protocol going as a daemon
1211 */
1212 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork)
1213 {
1214         int res, ret = -1;
1215         struct tevent_fd *fde;
1216
1217         /* create a unix domain stream socket to listen to */
1218         res = ux_socket_bind(ctdb);
1219         if (res!=0) {
1220                 DEBUG(DEBUG_ALERT,("Cannot continue.  Exiting!\n"));
1221                 exit(10);
1222         }
1223
1224         if (do_fork && fork()) {
1225                 return 0;
1226         }
1227
1228         tdb_reopen_all(false);
1229
1230         if (do_fork) {
1231                 if (setsid() == -1) {
1232                         ctdb_die(ctdb, "Failed to setsid()\n");
1233                 }
1234                 close(0);
1235                 if (open("/dev/null", O_RDONLY) != 0) {
1236                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
1237                         exit(11);
1238                 }
1239         }
1240         ignore_signal(SIGPIPE);
1241         ignore_signal(SIGUSR1);
1242
1243         ctdb->ctdbd_pid = getpid();
1244         DEBUG(DEBUG_ERR, ("Starting CTDBD (Version %s) as PID: %u\n",
1245                           CTDB_VERSION_STRING, ctdb->ctdbd_pid));
1246         ctdb_create_pidfile(ctdb->ctdbd_pid);
1247
1248         /* Make sure we log something when the daemon terminates.
1249          * This must be the first exit handler to run (so the last to
1250          * be registered.
1251          */
1252         atexit(print_exit_message);
1253
1254         if (ctdb->do_setsched) {
1255                 /* try to set us up as realtime */
1256                 if (!set_scheduler()) {
1257                         exit(1);
1258                 }
1259                 DEBUG(DEBUG_NOTICE, ("Set real-time scheduler priority\n"));
1260         }
1261
1262         ctdb->ev = tevent_context_init(NULL);
1263         tevent_loop_allow_nesting(ctdb->ev);
1264         tevent_set_trace_callback(ctdb->ev, ctdb_tevent_trace, ctdb);
1265         ret = ctdb_init_tevent_logging(ctdb);
1266         if (ret != 0) {
1267                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
1268                 exit(1);
1269         }
1270
1271         /* set up a handler to pick up sigchld */
1272         if (ctdb_init_sigchld(ctdb) == NULL) {
1273                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
1274                 exit(1);
1275         }
1276
1277         ctdb_set_child_logging(ctdb);
1278
1279         if (srvid_init(ctdb, &ctdb->srv) != 0) {
1280                 DEBUG(DEBUG_CRIT,("Failed to setup message srvid context\n"));
1281                 exit(1);
1282         }
1283
1284         /* initialize statistics collection */
1285         ctdb_statistics_init(ctdb);
1286
1287         /* force initial recovery for election */
1288         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
1289
1290         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_INIT);
1291         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
1292         if (ret != 0) {
1293                 ctdb_die(ctdb, "Failed to run init event\n");
1294         }
1295         ctdb_run_notification_script(ctdb, "init");
1296
1297         if (strcmp(ctdb->transport, "tcp") == 0) {
1298                 ret = ctdb_tcp_init(ctdb);
1299         }
1300 #ifdef USE_INFINIBAND
1301         if (strcmp(ctdb->transport, "ib") == 0) {
1302                 ret = ctdb_ibw_init(ctdb);
1303         }
1304 #endif
1305         if (ret != 0) {
1306                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
1307                 return -1;
1308         }
1309
1310         if (ctdb->methods == NULL) {
1311                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
1312                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
1313         }
1314
1315         /* Initialise the transport.  This sets the node address if it
1316          * was not set via the command-line. */
1317         if (ctdb->methods->initialise(ctdb) != 0) {
1318                 ctdb_fatal(ctdb, "transport failed to initialise");
1319         }
1320
1321         ctdb_set_my_pnn(ctdb);
1322
1323         initialise_node_flags(ctdb);
1324
1325         if (ctdb->public_addresses_file) {
1326                 ret = ctdb_set_public_addresses(ctdb, true);
1327                 if (ret == -1) {
1328                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
1329                         exit(1);
1330                 }
1331         }
1332
1333         ctdb_initialise_vnn_map(ctdb);
1334
1335         /* attach to existing databases */
1336         if (ctdb_attach_databases(ctdb) != 0) {
1337                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
1338         }
1339
1340         /* start frozen, then let the first election sort things out */
1341         if (!ctdb_blocking_freeze(ctdb)) {
1342                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
1343         }
1344
1345         /* now start accepting clients, only can do this once frozen */
1346         fde = tevent_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, TEVENT_FD_READ,
1347                             ctdb_accept_client, ctdb);
1348         if (fde == NULL) {
1349                 ctdb_fatal(ctdb, "Failed to add daemon socket to event loop");
1350         }
1351         tevent_fd_set_auto_close(fde);
1352
1353         /* Start the transport */
1354         if (ctdb->methods->start(ctdb) != 0) {
1355                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
1356                 ctdb_fatal(ctdb, "transport failed to start");
1357         }
1358
1359         /* Recovery daemon and timed events are started from the
1360          * callback, only after the setup event completes
1361          * successfully.
1362          */
1363         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SETUP);
1364         ret = ctdb_event_script_callback(ctdb,
1365                                          ctdb,
1366                                          ctdb_setup_event_callback,
1367                                          ctdb,
1368                                          CTDB_EVENT_SETUP,
1369                                          "%s",
1370                                          "");
1371         if (ret != 0) {
1372                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
1373                 exit(1);
1374         }
1375
1376         lockdown_memory(ctdb->valgrinding);
1377
1378         /* go into a wait loop to allow other nodes to complete */
1379         tevent_loop_wait(ctdb->ev);
1380
1381         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
1382         exit(1);
1383 }
1384
1385 /*
1386   allocate a packet for use in daemon<->daemon communication
1387  */
1388 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
1389                                                  TALLOC_CTX *mem_ctx, 
1390                                                  enum ctdb_operation operation, 
1391                                                  size_t length, size_t slength,
1392                                                  const char *type)
1393 {
1394         int size;
1395         struct ctdb_req_header *hdr;
1396
1397         length = MAX(length, slength);
1398         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
1399
1400         if (ctdb->methods == NULL) {
1401                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
1402                          operation, (unsigned)length));
1403                 return NULL;
1404         }
1405
1406         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
1407         if (hdr == NULL) {
1408                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
1409                          operation, (unsigned)length));
1410                 return NULL;
1411         }
1412         talloc_set_name_const(hdr, type);
1413         memset(hdr, 0, slength);
1414         hdr->length       = length;
1415         hdr->operation    = operation;
1416         hdr->ctdb_magic   = CTDB_MAGIC;
1417         hdr->ctdb_version = CTDB_PROTOCOL;
1418         hdr->generation   = ctdb->vnn_map->generation;
1419         hdr->srcnode      = ctdb->pnn;
1420
1421         return hdr;     
1422 }
1423
1424 struct daemon_control_state {
1425         struct daemon_control_state *next, *prev;
1426         struct ctdb_client *client;
1427         struct ctdb_req_control_old *c;
1428         uint32_t reqid;
1429         struct ctdb_node *node;
1430 };
1431
1432 /*
1433   callback when a control reply comes in
1434  */
1435 static void daemon_control_callback(struct ctdb_context *ctdb,
1436                                     int32_t status, TDB_DATA data, 
1437                                     const char *errormsg,
1438                                     void *private_data)
1439 {
1440         struct daemon_control_state *state = talloc_get_type(private_data, 
1441                                                              struct daemon_control_state);
1442         struct ctdb_client *client = state->client;
1443         struct ctdb_reply_control_old *r;
1444         size_t len;
1445         int ret;
1446
1447         /* construct a message to send to the client containing the data */
1448         len = offsetof(struct ctdb_reply_control_old, data) + data.dsize;
1449         if (errormsg) {
1450                 len += strlen(errormsg);
1451         }
1452         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1453                                struct ctdb_reply_control_old);
1454         CTDB_NO_MEMORY_VOID(ctdb, r);
1455
1456         r->hdr.reqid     = state->reqid;
1457         r->status        = status;
1458         r->datalen       = data.dsize;
1459         r->errorlen = 0;
1460         memcpy(&r->data[0], data.dptr, data.dsize);
1461         if (errormsg) {
1462                 r->errorlen = strlen(errormsg);
1463                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1464         }
1465
1466         ret = daemon_queue_send(client, &r->hdr);
1467         if (ret != -1) {
1468                 talloc_free(state);
1469         }
1470 }
1471
1472 /*
1473   fail all pending controls to a disconnected node
1474  */
1475 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1476 {
1477         struct daemon_control_state *state;
1478         while ((state = node->pending_controls)) {
1479                 DLIST_REMOVE(node->pending_controls, state);
1480                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1481                                         "node is disconnected", state);
1482         }
1483 }
1484
1485 /*
1486   destroy a daemon_control_state
1487  */
1488 static int daemon_control_destructor(struct daemon_control_state *state)
1489 {
1490         if (state->node) {
1491                 DLIST_REMOVE(state->node->pending_controls, state);
1492         }
1493         return 0;
1494 }
1495
1496 /*
1497   this is called when the ctdb daemon received a ctdb request control
1498   from a local client over the unix domain socket
1499  */
1500 static void daemon_request_control_from_client(struct ctdb_client *client, 
1501                                                struct ctdb_req_control_old *c)
1502 {
1503         TDB_DATA data;
1504         int res;
1505         struct daemon_control_state *state;
1506         TALLOC_CTX *tmp_ctx = talloc_new(client);
1507
1508         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1509                 c->hdr.destnode = client->ctdb->pnn;
1510         }
1511
1512         state = talloc(client, struct daemon_control_state);
1513         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1514
1515         state->client = client;
1516         state->c = talloc_steal(state, c);
1517         state->reqid = c->hdr.reqid;
1518         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1519                 state->node = client->ctdb->nodes[c->hdr.destnode];
1520                 DLIST_ADD(state->node->pending_controls, state);
1521         } else {
1522                 state->node = NULL;
1523         }
1524
1525         talloc_set_destructor(state, daemon_control_destructor);
1526
1527         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1528                 talloc_steal(tmp_ctx, state);
1529         }
1530         
1531         data.dptr = &c->data[0];
1532         data.dsize = c->datalen;
1533         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1534                                        c->srvid, c->opcode, client->client_id,
1535                                        c->flags,
1536                                        data, daemon_control_callback,
1537                                        state);
1538         if (res != 0) {
1539                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1540                          c->hdr.destnode));
1541         }
1542
1543         talloc_free(tmp_ctx);
1544 }
1545
1546 /*
1547   register a call function
1548 */
1549 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1550                          ctdb_fn_t fn, int id)
1551 {
1552         struct ctdb_registered_call *call;
1553         struct ctdb_db_context *ctdb_db;
1554
1555         ctdb_db = find_ctdb_db(ctdb, db_id);
1556         if (ctdb_db == NULL) {
1557                 return -1;
1558         }
1559
1560         call = talloc(ctdb_db, struct ctdb_registered_call);
1561         call->fn = fn;
1562         call->id = id;
1563
1564         DLIST_ADD(ctdb_db->calls, call);        
1565         return 0;
1566 }
1567
1568
1569
1570 /*
1571   this local messaging handler is ugly, but is needed to prevent
1572   recursion in ctdb_send_message() when the destination node is the
1573   same as the source node
1574  */
1575 struct ctdb_local_message {
1576         struct ctdb_context *ctdb;
1577         uint64_t srvid;
1578         TDB_DATA data;
1579 };
1580
1581 static void ctdb_local_message_trigger(struct tevent_context *ev,
1582                                        struct tevent_timer *te,
1583                                        struct timeval t, void *private_data)
1584 {
1585         struct ctdb_local_message *m = talloc_get_type(
1586                 private_data, struct ctdb_local_message);
1587
1588         srvid_dispatch(m->ctdb->srv, m->srvid, CTDB_SRVID_ALL, m->data);
1589         talloc_free(m);
1590 }
1591
1592 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1593 {
1594         struct ctdb_local_message *m;
1595         m = talloc(ctdb, struct ctdb_local_message);
1596         CTDB_NO_MEMORY(ctdb, m);
1597
1598         m->ctdb = ctdb;
1599         m->srvid = srvid;
1600         m->data  = data;
1601         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1602         if (m->data.dptr == NULL) {
1603                 talloc_free(m);
1604                 return -1;
1605         }
1606
1607         /* this needs to be done as an event to prevent recursion */
1608         tevent_add_timer(ctdb->ev, m, timeval_zero(),
1609                          ctdb_local_message_trigger, m);
1610         return 0;
1611 }
1612
1613 /*
1614   send a ctdb message
1615 */
1616 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1617                              uint64_t srvid, TDB_DATA data)
1618 {
1619         struct ctdb_req_message_old *r;
1620         int len;
1621
1622         if (ctdb->methods == NULL) {
1623                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1624                 return -1;
1625         }
1626
1627         /* see if this is a message to ourselves */
1628         if (pnn == ctdb->pnn) {
1629                 return ctdb_local_message(ctdb, srvid, data);
1630         }
1631
1632         len = offsetof(struct ctdb_req_message_old, data) + data.dsize;
1633         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1634                                     struct ctdb_req_message_old);
1635         CTDB_NO_MEMORY(ctdb, r);
1636
1637         r->hdr.destnode  = pnn;
1638         r->srvid         = srvid;
1639         r->datalen       = data.dsize;
1640         memcpy(&r->data[0], data.dptr, data.dsize);
1641
1642         ctdb_queue_packet(ctdb, &r->hdr);
1643
1644         talloc_free(r);
1645         return 0;
1646 }
1647
1648
1649
1650 struct ctdb_client_notify_list {
1651         struct ctdb_client_notify_list *next, *prev;
1652         struct ctdb_context *ctdb;
1653         uint64_t srvid;
1654         TDB_DATA data;
1655 };
1656
1657
1658 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1659 {
1660         int ret;
1661
1662         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1663
1664         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1665         if (ret != 0) {
1666                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1667         }
1668
1669         return 0;
1670 }
1671
1672 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1673 {
1674         struct ctdb_notify_data_old *notify = (struct ctdb_notify_data_old *)indata.dptr;
1675         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1676         struct ctdb_client_notify_list *nl;
1677
1678         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1679
1680         if (indata.dsize < offsetof(struct ctdb_notify_data_old, notify_data)) {
1681                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1682                 return -1;
1683         }
1684
1685         if (indata.dsize != (notify->len + offsetof(struct ctdb_notify_data_old, notify_data))) {
1686                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_notify_data_old, notify_data))));
1687                 return -1;
1688         }
1689
1690
1691         if (client == NULL) {
1692                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1693                 return -1;
1694         }
1695
1696         for(nl=client->notify; nl; nl=nl->next) {
1697                 if (nl->srvid == notify->srvid) {
1698                         break;
1699                 }
1700         }
1701         if (nl != NULL) {
1702                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1703                 return -1;
1704         }
1705
1706         nl = talloc(client, struct ctdb_client_notify_list);
1707         CTDB_NO_MEMORY(ctdb, nl);
1708         nl->ctdb       = ctdb;
1709         nl->srvid      = notify->srvid;
1710         nl->data.dsize = notify->len;
1711         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1712         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1713         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1714         
1715         DLIST_ADD(client->notify, nl);
1716         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1717
1718         return 0;
1719 }
1720
1721 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1722 {
1723         uint64_t srvid = *(uint64_t *)indata.dptr;
1724         struct ctdb_client *client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
1725         struct ctdb_client_notify_list *nl;
1726
1727         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)srvid, client_id));
1728
1729         if (client == NULL) {
1730                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1731                 return -1;
1732         }
1733
1734         for(nl=client->notify; nl; nl=nl->next) {
1735                 if (nl->srvid == srvid) {
1736                         break;
1737                 }
1738         }
1739         if (nl == NULL) {
1740                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)srvid));
1741                 return -1;
1742         }
1743
1744         DLIST_REMOVE(client->notify, nl);
1745         talloc_set_destructor(nl, NULL);
1746         talloc_free(nl);
1747
1748         return 0;
1749 }
1750
1751 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1752 {
1753         struct ctdb_client_pid_list *client_pid;
1754
1755         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1756                 if (client_pid->pid == pid) {
1757                         return client_pid->client;
1758                 }
1759         }
1760         return NULL;
1761 }
1762
1763
1764 /* This control is used by samba when probing if a process (of a samba daemon)
1765    exists on the node.
1766    Samba does this when it needs/wants to check if a subrecord in one of the
1767    databases is still valied, or if it is stale and can be removed.
1768    If the node is in unhealthy or stopped state we just kill of the samba
1769    process holding htis sub-record and return to the calling samba that
1770    the process does not exist.
1771    This allows us to forcefully recall subrecords registered by samba processes
1772    on banned and stopped nodes.
1773 */
1774 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1775 {
1776         struct ctdb_client *client;
1777
1778         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1779                 client = ctdb_find_client_by_pid(ctdb, pid);
1780                 if (client != NULL) {
1781                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1782                         talloc_free(client);
1783                 }
1784                 return -1;
1785         }
1786
1787         return kill(pid, 0);
1788 }
1789
1790 int ctdb_control_getnodesfile(struct ctdb_context *ctdb, uint32_t opcode, TDB_DATA indata, TDB_DATA *outdata)
1791 {
1792         struct ctdb_node_map_old *node_map = NULL;
1793
1794         CHECK_CONTROL_DATA_SIZE(0);
1795
1796         node_map = ctdb_read_nodes_file(ctdb, ctdb->nodes_file);
1797         if (node_map == NULL) {
1798                 DEBUG(DEBUG_ERR, ("Failed to read nodes file\n"));
1799                 return -1;
1800         }
1801
1802         outdata->dptr  = (unsigned char *)node_map;
1803         outdata->dsize = talloc_get_size(outdata->dptr);
1804
1805         return 0;
1806 }
1807
1808 void ctdb_shutdown_sequence(struct ctdb_context *ctdb, int exit_code)
1809 {
1810         if (ctdb->runstate == CTDB_RUNSTATE_SHUTDOWN) {
1811                 DEBUG(DEBUG_NOTICE,("Already shutting down so will not proceed.\n"));
1812                 return;
1813         }
1814
1815         DEBUG(DEBUG_NOTICE,("Shutdown sequence commencing.\n"));
1816         ctdb_set_runstate(ctdb, CTDB_RUNSTATE_SHUTDOWN);
1817         ctdb_stop_recoverd(ctdb);
1818         ctdb_stop_keepalive(ctdb);
1819         ctdb_stop_monitoring(ctdb);
1820         ctdb_release_all_ips(ctdb);
1821         ctdb_event_script(ctdb, CTDB_EVENT_SHUTDOWN);
1822         if (ctdb->methods != NULL) {
1823                 ctdb->methods->shutdown(ctdb);
1824         }
1825
1826         DEBUG(DEBUG_NOTICE,("Shutdown sequence complete, exiting.\n"));
1827         exit(exit_code);
1828 }
1829
1830 /* When forking the main daemon and the child process needs to connect
1831  * back to the daemon as a client process, this function can be used
1832  * to change the ctdb context from daemon into client mode.  The child
1833  * process must be created using ctdb_fork() and not fork() -
1834  * ctdb_fork() does some necessary housekeeping.
1835  */
1836 int switch_from_server_to_client(struct ctdb_context *ctdb, const char *fmt, ...)
1837 {
1838         int ret;
1839         va_list ap;
1840
1841         /* Add extra information so we can identify this in the logs */
1842         va_start(ap, fmt);
1843         debug_extra = talloc_strdup_append(talloc_vasprintf(NULL, fmt, ap), ":");
1844         va_end(ap);
1845
1846         /* get a new event context */
1847         ctdb->ev = tevent_context_init(ctdb);
1848         tevent_loop_allow_nesting(ctdb->ev);
1849
1850         /* Connect to main CTDB daemon */
1851         ret = ctdb_socket_connect(ctdb);
1852         if (ret != 0) {
1853                 DEBUG(DEBUG_ALERT, (__location__ " Failed to init ctdb client\n"));
1854                 return -1;
1855         }
1856
1857         ctdb->can_send_controls = true;
1858
1859         return 0;
1860 }