75344ad386c73bd2ea8b52fa9d859e2e14eb42c9
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207
208 /*
209   destroy a ctdb_client
210 */
211 static int ctdb_client_destructor(struct ctdb_client *client)
212 {
213         struct ctdb_db_context *ctdb_db;
214
215         ctdb_takeover_client_destructor_hook(client);
216         ctdb_reqid_remove(client->ctdb, client->client_id);
217         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
218
219         if (client->num_persistent_updates != 0) {
220                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
222         }
223         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
224         if (ctdb_db) {
225                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226                                   "commit active. Forcing recovery.\n"));
227                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
228
229                 /* legacy trans2 transaction state: */
230                 ctdb_db->transaction_active = false;
231
232                 /*
233                  * trans3 transaction state:
234                  *
235                  * The destructor sets the pointer to NULL.
236                  */
237                 talloc_free(ctdb_db->persistent_state);
238         }
239
240         return 0;
241 }
242
243
244 /*
245   this is called when the ctdb daemon received a ctdb request message
246   from a local client over the unix domain socket
247  */
248 static void daemon_request_message_from_client(struct ctdb_client *client, 
249                                                struct ctdb_req_message *c)
250 {
251         TDB_DATA data;
252         int res;
253
254         /* maybe the message is for another client on this node */
255         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
256                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
257                 return;
258         }
259
260         /* its for a remote node */
261         data.dptr = &c->data[0];
262         data.dsize = c->datalen;
263         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
264                                        c->srvid, data);
265         if (res != 0) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
267                          c->hdr.destnode));
268         }
269 }
270
271
272 struct daemon_call_state {
273         struct ctdb_client *client;
274         uint32_t reqid;
275         struct ctdb_call *call;
276         struct timeval start_time;
277 };
278
279 /* 
280    complete a call from a client 
281 */
282 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
283 {
284         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
285                                                            struct daemon_call_state);
286         struct ctdb_reply_call *r;
287         int res;
288         uint32_t length;
289         struct ctdb_client *client = dstate->client;
290         struct ctdb_db_context *ctdb_db = state->ctdb_db;
291
292         talloc_steal(client, dstate);
293         talloc_steal(dstate, dstate->call);
294
295         res = ctdb_daemon_call_recv(state, dstate->call);
296         if (res != 0) {
297                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
298                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
299
300                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
301                 return;
302         }
303
304         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
305         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
306                                length, struct ctdb_reply_call);
307         if (r == NULL) {
308                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
309                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
310                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
311                 return;
312         }
313         r->hdr.reqid        = dstate->reqid;
314         r->datalen          = dstate->call->reply_data.dsize;
315         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
316
317         res = daemon_queue_send(client, &r->hdr);
318         if (res == -1) {
319                 /* client is dead - return immediately */
320                 return;
321         }
322         if (res != 0) {
323                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
324         }
325         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
326         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
327         talloc_free(dstate);
328 }
329
330 struct ctdb_daemon_packet_wrap {
331         struct ctdb_context *ctdb;
332         uint32_t client_id;
333 };
334
335 /*
336   a wrapper to catch disconnected clients
337  */
338 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
339 {
340         struct ctdb_client *client;
341         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
342                                                             struct ctdb_daemon_packet_wrap);
343         if (w == NULL) {
344                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
345                 return;
346         }
347
348         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
349         if (client == NULL) {
350                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
351                          w->client_id));
352                 talloc_free(w);
353                 return;
354         }
355         talloc_free(w);
356
357         /* process it */
358         daemon_incoming_packet(client, hdr);    
359 }
360
361
362 /*
363   this is called when the ctdb daemon received a ctdb request call
364   from a local client over the unix domain socket
365  */
366 static void daemon_request_call_from_client(struct ctdb_client *client, 
367                                             struct ctdb_req_call *c)
368 {
369         struct ctdb_call_state *state;
370         struct ctdb_db_context *ctdb_db;
371         struct daemon_call_state *dstate;
372         struct ctdb_call *call;
373         struct ctdb_ltdb_header header;
374         TDB_DATA key, data;
375         int ret;
376         struct ctdb_context *ctdb = client->ctdb;
377         struct ctdb_daemon_packet_wrap *w;
378
379         CTDB_INCREMENT_STAT(ctdb, total_calls);
380         CTDB_DECREMENT_STAT(ctdb, pending_calls);
381
382         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
383         if (!ctdb_db) {
384                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
385                           c->db_id));
386                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
387                 return;
388         }
389
390         if (ctdb_db->unhealthy_reason) {
391                 /*
392                  * this is just a warning, as the tdb should be empty anyway,
393                  * and only persistent databases can be unhealthy, which doesn't
394                  * use this code patch
395                  */
396                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
397                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
398         }
399
400         key.dptr = c->data;
401         key.dsize = c->keylen;
402
403         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
404         CTDB_NO_MEMORY_VOID(ctdb, w);   
405
406         w->ctdb = ctdb;
407         w->client_id = client->client_id;
408
409         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
410                                            (struct ctdb_req_header *)c, &data,
411                                            daemon_incoming_packet_wrap, w, True);
412         if (ret == -2) {
413                 /* will retry later */
414                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
415                 return;
416         }
417
418         talloc_free(w);
419
420         if (ret != 0) {
421                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
422                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
423                 return;
424         }
425
426         dstate = talloc(client, struct daemon_call_state);
427         if (dstate == NULL) {
428                 ret = ctdb_ltdb_unlock(ctdb_db, key);
429                 if (ret != 0) {
430                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
431                 }
432
433                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
434                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
435                 return;
436         }
437         dstate->start_time = timeval_current();
438         dstate->client = client;
439         dstate->reqid  = c->hdr.reqid;
440         talloc_steal(dstate, data.dptr);
441
442         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
443         if (call == NULL) {
444                 ret = ctdb_ltdb_unlock(ctdb_db, key);
445                 if (ret != 0) {
446                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
447                 }
448
449                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
450                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
451                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
452                 return;
453         }
454
455         call->call_id = c->callid;
456         call->key = key;
457         call->call_data.dptr = c->data + c->keylen;
458         call->call_data.dsize = c->calldatalen;
459         call->flags = c->flags;
460
461         if (header.dmaster == ctdb->pnn) {
462                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
463         } else {
464                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
465         }
466
467         ret = ctdb_ltdb_unlock(ctdb_db, key);
468         if (ret != 0) {
469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
470         }
471
472         if (state == NULL) {
473                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
474                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
475                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
476                 return;
477         }
478         talloc_steal(state, dstate);
479         talloc_steal(client, state);
480
481         state->async.fn = daemon_call_from_client_callback;
482         state->async.private_data = dstate;
483 }
484
485
486 static void daemon_request_control_from_client(struct ctdb_client *client, 
487                                                struct ctdb_req_control *c);
488
489 /* data contains a packet from the client */
490 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
491 {
492         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
493         TALLOC_CTX *tmp_ctx;
494         struct ctdb_context *ctdb = client->ctdb;
495
496         /* place the packet as a child of a tmp_ctx. We then use
497            talloc_free() below to free it. If any of the calls want
498            to keep it, then they will steal it somewhere else, and the
499            talloc_free() will be a no-op */
500         tmp_ctx = talloc_new(client);
501         talloc_steal(tmp_ctx, hdr);
502
503         if (hdr->ctdb_magic != CTDB_MAGIC) {
504                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
505                 goto done;
506         }
507
508         if (hdr->ctdb_version != CTDB_VERSION) {
509                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
510                 goto done;
511         }
512
513         switch (hdr->operation) {
514         case CTDB_REQ_CALL:
515                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
516                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
517                 break;
518
519         case CTDB_REQ_MESSAGE:
520                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
521                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
522                 break;
523
524         case CTDB_REQ_CONTROL:
525                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
526                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
527                 break;
528
529         default:
530                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
531                          hdr->operation));
532         }
533
534 done:
535         talloc_free(tmp_ctx);
536 }
537
538 /*
539   called when the daemon gets a incoming packet
540  */
541 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
542 {
543         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
544         struct ctdb_req_header *hdr;
545
546         if (cnt == 0) {
547                 talloc_free(client);
548                 return;
549         }
550
551         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
552
553         if (cnt < sizeof(*hdr)) {
554                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
555                                (unsigned)cnt);
556                 return;
557         }
558         hdr = (struct ctdb_req_header *)data;
559         if (cnt != hdr->length) {
560                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
561                                (unsigned)hdr->length, (unsigned)cnt);
562                 return;
563         }
564
565         if (hdr->ctdb_magic != CTDB_MAGIC) {
566                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
567                 return;
568         }
569
570         if (hdr->ctdb_version != CTDB_VERSION) {
571                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
572                 return;
573         }
574
575         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
576                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
577                  hdr->srcnode, hdr->destnode));
578
579         /* it is the responsibility of the incoming packet function to free 'data' */
580         daemon_incoming_packet(client, hdr);
581 }
582
583
584 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
585 {
586         if (client_pid->ctdb->client_pids != NULL) {
587                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
588         }
589
590         return 0;
591 }
592
593
594 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
595                          uint16_t flags, void *private_data)
596 {
597         struct sockaddr_un addr;
598         socklen_t len;
599         int fd;
600         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
601         struct ctdb_client *client;
602         struct ctdb_client_pid_list *client_pid;
603 #ifdef _AIX
604         struct peercred_struct cr;
605         socklen_t crl = sizeof(struct peercred_struct);
606 #else
607         struct ucred cr;
608         socklen_t crl = sizeof(struct ucred);
609 #endif
610
611         memset(&addr, 0, sizeof(addr));
612         len = sizeof(addr);
613         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
614         if (fd == -1) {
615                 return;
616         }
617
618         set_nonblocking(fd);
619         set_close_on_exec(fd);
620
621         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
622
623         client = talloc_zero(ctdb, struct ctdb_client);
624 #ifdef _AIX
625         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
626 #else
627         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
628 #endif
629                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
630         }
631
632         client->ctdb = ctdb;
633         client->fd = fd;
634         client->client_id = ctdb_reqid_new(ctdb, client);
635         client->pid = cr.pid;
636
637         client_pid = talloc(client, struct ctdb_client_pid_list);
638         if (client_pid == NULL) {
639                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
640                 close(fd);
641                 talloc_free(client);
642                 return;
643         }               
644         client_pid->ctdb   = ctdb;
645         client_pid->pid    = cr.pid;
646         client_pid->client = client;
647
648         DLIST_ADD(ctdb->client_pids, client_pid);
649
650         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
651                                          ctdb_daemon_read_cb, client,
652                                          "client-%u", client->pid);
653
654         talloc_set_destructor(client, ctdb_client_destructor);
655         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
656         CTDB_INCREMENT_STAT(ctdb, num_clients);
657 }
658
659
660
661 /*
662   create a unix domain socket and bind it
663   return a file descriptor open on the socket 
664 */
665 static int ux_socket_bind(struct ctdb_context *ctdb)
666 {
667         struct sockaddr_un addr;
668
669         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
670         if (ctdb->daemon.sd == -1) {
671                 return -1;
672         }
673
674         set_close_on_exec(ctdb->daemon.sd);
675         set_nonblocking(ctdb->daemon.sd);
676
677         memset(&addr, 0, sizeof(addr));
678         addr.sun_family = AF_UNIX;
679         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
680
681         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
682                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
683                 goto failed;
684         }       
685
686         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
687             chmod(ctdb->daemon.name, 0700) != 0) {
688                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
689                 goto failed;
690         } 
691
692
693         if (listen(ctdb->daemon.sd, 100) != 0) {
694                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
695                 goto failed;
696         }
697
698         return 0;
699
700 failed:
701         close(ctdb->daemon.sd);
702         ctdb->daemon.sd = -1;
703         return -1;      
704 }
705
706 static void sig_child_handler(struct event_context *ev,
707         struct signal_event *se, int signum, int count,
708         void *dont_care, 
709         void *private_data)
710 {
711 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
712         int status;
713         pid_t pid = -1;
714
715         while (pid != 0) {
716                 pid = waitpid(-1, &status, WNOHANG);
717                 if (pid == -1) {
718                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
719                         return;
720                 }
721                 if (pid > 0) {
722                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
723                 }
724         }
725 }
726
727 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
728                                       void *private_data)
729 {
730         if (status != 0) {
731                 ctdb_fatal(ctdb, "Failed to run setup event\n");
732                 return;
733         }
734         ctdb_run_notification_script(ctdb, "setup");
735
736         /* tell all other nodes we've just started up */
737         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
738                                  0, CTDB_CONTROL_STARTUP, 0,
739                                  CTDB_CTRL_FLAG_NOREPLY,
740                                  tdb_null, NULL, NULL);
741 }
742
743 /*
744   start the protocol going as a daemon
745 */
746 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
747 {
748         int res, ret = -1;
749         struct fd_event *fde;
750         const char *domain_socket_name;
751         struct signal_event *se;
752
753         /* get rid of any old sockets */
754         unlink(ctdb->daemon.name);
755
756         /* create a unix domain stream socket to listen to */
757         res = ux_socket_bind(ctdb);
758         if (res!=0) {
759                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
760                 exit(10);
761         }
762
763         if (do_fork && fork()) {
764                 return 0;
765         }
766
767         tdb_reopen_all(False);
768
769         if (do_fork) {
770                 setsid();
771                 close(0);
772                 if (open("/dev/null", O_RDONLY) != 0) {
773                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
774                         exit(11);
775                 }
776         }
777         block_signal(SIGPIPE);
778
779         ctdbd_pid = getpid();
780         ctdb->ctdbd_pid = ctdbd_pid;
781
782
783         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
784
785         if (ctdb->do_setsched) {
786                 /* try to set us up as realtime */
787                 ctdb_set_scheduler(ctdb);
788         }
789
790         /* ensure the socket is deleted on exit of the daemon */
791         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
792         if (domain_socket_name == NULL) {
793                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
794                 exit(12);
795         }
796
797         ctdb->ev = event_context_init(NULL);
798         tevent_loop_allow_nesting(ctdb->ev);
799         ret = ctdb_init_tevent_logging(ctdb);
800         if (ret != 0) {
801                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
802                 exit(1);
803         }
804
805         ctdb_set_child_logging(ctdb);
806
807         /* initialize statistics collection */
808         ctdb_statistics_init(ctdb);
809
810         /* force initial recovery for election */
811         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
812
813         if (strcmp(ctdb->transport, "tcp") == 0) {
814                 int ctdb_tcp_init(struct ctdb_context *);
815                 ret = ctdb_tcp_init(ctdb);
816         }
817 #ifdef USE_INFINIBAND
818         if (strcmp(ctdb->transport, "ib") == 0) {
819                 int ctdb_ibw_init(struct ctdb_context *);
820                 ret = ctdb_ibw_init(ctdb);
821         }
822 #endif
823         if (ret != 0) {
824                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
825                 return -1;
826         }
827
828         if (ctdb->methods == NULL) {
829                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
830                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
831         }
832
833         /* initialise the transport  */
834         if (ctdb->methods->initialise(ctdb) != 0) {
835                 ctdb_fatal(ctdb, "transport failed to initialise");
836         }
837         if (public_address_list) {
838                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
839                 if (ret == -1) {
840                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
841                         exit(1);
842                 }
843         }
844
845
846         /* attach to existing databases */
847         if (ctdb_attach_databases(ctdb) != 0) {
848                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
849         }
850
851         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
852         if (ret != 0) {
853                 ctdb_fatal(ctdb, "Failed to run init event\n");
854         }
855         ctdb_run_notification_script(ctdb, "init");
856
857         /* start frozen, then let the first election sort things out */
858         if (ctdb_blocking_freeze(ctdb)) {
859                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
860         }
861
862         /* now start accepting clients, only can do this once frozen */
863         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
864                            EVENT_FD_READ,
865                            ctdb_accept_client, ctdb);
866         tevent_fd_set_auto_close(fde);
867
868         /* release any IPs we hold from previous runs of the daemon */
869         if (ctdb->tunable.disable_ip_failover == 0) {
870                 ctdb_release_all_ips(ctdb);
871         }
872
873         /* start the transport going */
874         ctdb_start_transport(ctdb);
875
876         /* set up a handler to pick up sigchld */
877         se = event_add_signal(ctdb->ev, ctdb,
878                                      SIGCHLD, 0,
879                                      sig_child_handler,
880                                      ctdb);
881         if (se == NULL) {
882                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
883                 exit(1);
884         }
885
886         ret = ctdb_event_script_callback(ctdb,
887                                          ctdb,
888                                          ctdb_setup_event_callback,
889                                          ctdb,
890                                          false,
891                                          CTDB_EVENT_SETUP,
892                                          "");
893         if (ret != 0) {
894                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
895                 exit(1);
896         }
897
898         if (use_syslog) {
899                 if (start_syslog_daemon(ctdb)) {
900                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
901                         exit(10);
902                 }
903         }
904
905         ctdb_lockdown_memory(ctdb);
906           
907         /* go into a wait loop to allow other nodes to complete */
908         event_loop_wait(ctdb->ev);
909
910         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
911         exit(1);
912 }
913
914 /*
915   allocate a packet for use in daemon<->daemon communication
916  */
917 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
918                                                  TALLOC_CTX *mem_ctx, 
919                                                  enum ctdb_operation operation, 
920                                                  size_t length, size_t slength,
921                                                  const char *type)
922 {
923         int size;
924         struct ctdb_req_header *hdr;
925
926         length = MAX(length, slength);
927         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
928
929         if (ctdb->methods == NULL) {
930                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
931                          operation, (unsigned)length));
932                 return NULL;
933         }
934
935         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
936         if (hdr == NULL) {
937                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
938                          operation, (unsigned)length));
939                 return NULL;
940         }
941         talloc_set_name_const(hdr, type);
942         memset(hdr, 0, slength);
943         hdr->length       = length;
944         hdr->operation    = operation;
945         hdr->ctdb_magic   = CTDB_MAGIC;
946         hdr->ctdb_version = CTDB_VERSION;
947         hdr->generation   = ctdb->vnn_map->generation;
948         hdr->srcnode      = ctdb->pnn;
949
950         return hdr;     
951 }
952
953 struct daemon_control_state {
954         struct daemon_control_state *next, *prev;
955         struct ctdb_client *client;
956         struct ctdb_req_control *c;
957         uint32_t reqid;
958         struct ctdb_node *node;
959 };
960
961 /*
962   callback when a control reply comes in
963  */
964 static void daemon_control_callback(struct ctdb_context *ctdb,
965                                     int32_t status, TDB_DATA data, 
966                                     const char *errormsg,
967                                     void *private_data)
968 {
969         struct daemon_control_state *state = talloc_get_type(private_data, 
970                                                              struct daemon_control_state);
971         struct ctdb_client *client = state->client;
972         struct ctdb_reply_control *r;
973         size_t len;
974         int ret;
975
976         /* construct a message to send to the client containing the data */
977         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
978         if (errormsg) {
979                 len += strlen(errormsg);
980         }
981         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
982                                struct ctdb_reply_control);
983         CTDB_NO_MEMORY_VOID(ctdb, r);
984
985         r->hdr.reqid     = state->reqid;
986         r->status        = status;
987         r->datalen       = data.dsize;
988         r->errorlen = 0;
989         memcpy(&r->data[0], data.dptr, data.dsize);
990         if (errormsg) {
991                 r->errorlen = strlen(errormsg);
992                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
993         }
994
995         ret = daemon_queue_send(client, &r->hdr);
996         if (ret != -1) {
997                 talloc_free(state);
998         }
999 }
1000
1001 /*
1002   fail all pending controls to a disconnected node
1003  */
1004 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1005 {
1006         struct daemon_control_state *state;
1007         while ((state = node->pending_controls)) {
1008                 DLIST_REMOVE(node->pending_controls, state);
1009                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1010                                         "node is disconnected", state);
1011         }
1012 }
1013
1014 /*
1015   destroy a daemon_control_state
1016  */
1017 static int daemon_control_destructor(struct daemon_control_state *state)
1018 {
1019         if (state->node) {
1020                 DLIST_REMOVE(state->node->pending_controls, state);
1021         }
1022         return 0;
1023 }
1024
1025 /*
1026   this is called when the ctdb daemon received a ctdb request control
1027   from a local client over the unix domain socket
1028  */
1029 static void daemon_request_control_from_client(struct ctdb_client *client, 
1030                                                struct ctdb_req_control *c)
1031 {
1032         TDB_DATA data;
1033         int res;
1034         struct daemon_control_state *state;
1035         TALLOC_CTX *tmp_ctx = talloc_new(client);
1036
1037         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1038                 c->hdr.destnode = client->ctdb->pnn;
1039         }
1040
1041         state = talloc(client, struct daemon_control_state);
1042         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1043
1044         state->client = client;
1045         state->c = talloc_steal(state, c);
1046         state->reqid = c->hdr.reqid;
1047         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1048                 state->node = client->ctdb->nodes[c->hdr.destnode];
1049                 DLIST_ADD(state->node->pending_controls, state);
1050         } else {
1051                 state->node = NULL;
1052         }
1053
1054         talloc_set_destructor(state, daemon_control_destructor);
1055
1056         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1057                 talloc_steal(tmp_ctx, state);
1058         }
1059         
1060         data.dptr = &c->data[0];
1061         data.dsize = c->datalen;
1062         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1063                                        c->srvid, c->opcode, client->client_id,
1064                                        c->flags,
1065                                        data, daemon_control_callback,
1066                                        state);
1067         if (res != 0) {
1068                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1069                          c->hdr.destnode));
1070         }
1071
1072         talloc_free(tmp_ctx);
1073 }
1074
1075 /*
1076   register a call function
1077 */
1078 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1079                          ctdb_fn_t fn, int id)
1080 {
1081         struct ctdb_registered_call *call;
1082         struct ctdb_db_context *ctdb_db;
1083
1084         ctdb_db = find_ctdb_db(ctdb, db_id);
1085         if (ctdb_db == NULL) {
1086                 return -1;
1087         }
1088
1089         call = talloc(ctdb_db, struct ctdb_registered_call);
1090         call->fn = fn;
1091         call->id = id;
1092
1093         DLIST_ADD(ctdb_db->calls, call);        
1094         return 0;
1095 }
1096
1097
1098
1099 /*
1100   this local messaging handler is ugly, but is needed to prevent
1101   recursion in ctdb_send_message() when the destination node is the
1102   same as the source node
1103  */
1104 struct ctdb_local_message {
1105         struct ctdb_context *ctdb;
1106         uint64_t srvid;
1107         TDB_DATA data;
1108 };
1109
1110 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1111                                        struct timeval t, void *private_data)
1112 {
1113         struct ctdb_local_message *m = talloc_get_type(private_data, 
1114                                                        struct ctdb_local_message);
1115         int res;
1116
1117         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1118         if (res != 0) {
1119                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1120                           (unsigned long long)m->srvid));
1121         }
1122         talloc_free(m);
1123 }
1124
1125 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1126 {
1127         struct ctdb_local_message *m;
1128         m = talloc(ctdb, struct ctdb_local_message);
1129         CTDB_NO_MEMORY(ctdb, m);
1130
1131         m->ctdb = ctdb;
1132         m->srvid = srvid;
1133         m->data  = data;
1134         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1135         if (m->data.dptr == NULL) {
1136                 talloc_free(m);
1137                 return -1;
1138         }
1139
1140         /* this needs to be done as an event to prevent recursion */
1141         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1142         return 0;
1143 }
1144
1145 /*
1146   send a ctdb message
1147 */
1148 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1149                              uint64_t srvid, TDB_DATA data)
1150 {
1151         struct ctdb_req_message *r;
1152         int len;
1153
1154         if (ctdb->methods == NULL) {
1155                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1156                 return -1;
1157         }
1158
1159         /* see if this is a message to ourselves */
1160         if (pnn == ctdb->pnn) {
1161                 return ctdb_local_message(ctdb, srvid, data);
1162         }
1163
1164         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1165         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1166                                     struct ctdb_req_message);
1167         CTDB_NO_MEMORY(ctdb, r);
1168
1169         r->hdr.destnode  = pnn;
1170         r->srvid         = srvid;
1171         r->datalen       = data.dsize;
1172         memcpy(&r->data[0], data.dptr, data.dsize);
1173
1174         ctdb_queue_packet(ctdb, &r->hdr);
1175
1176         talloc_free(r);
1177         return 0;
1178 }
1179
1180
1181
1182 struct ctdb_client_notify_list {
1183         struct ctdb_client_notify_list *next, *prev;
1184         struct ctdb_context *ctdb;
1185         uint64_t srvid;
1186         TDB_DATA data;
1187 };
1188
1189
1190 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1191 {
1192         int ret;
1193
1194         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1195
1196         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1197         if (ret != 0) {
1198                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1199         }
1200
1201         return 0;
1202 }
1203
1204 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1205 {
1206         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1207         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1208         struct ctdb_client_notify_list *nl;
1209
1210         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1211
1212         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1213                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1214                 return -1;
1215         }
1216
1217         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1218                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1219                 return -1;
1220         }
1221
1222
1223         if (client == NULL) {
1224                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1225                 return -1;
1226         }
1227
1228         for(nl=client->notify; nl; nl=nl->next) {
1229                 if (nl->srvid == notify->srvid) {
1230                         break;
1231                 }
1232         }
1233         if (nl != NULL) {
1234                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1235                 return -1;
1236         }
1237
1238         nl = talloc(client, struct ctdb_client_notify_list);
1239         CTDB_NO_MEMORY(ctdb, nl);
1240         nl->ctdb       = ctdb;
1241         nl->srvid      = notify->srvid;
1242         nl->data.dsize = notify->len;
1243         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1244         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1245         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1246         
1247         DLIST_ADD(client->notify, nl);
1248         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1249
1250         return 0;
1251 }
1252
1253 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1254 {
1255         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1256         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1257         struct ctdb_client_notify_list *nl;
1258
1259         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1260
1261         if (client == NULL) {
1262                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1263                 return -1;
1264         }
1265
1266         for(nl=client->notify; nl; nl=nl->next) {
1267                 if (nl->srvid == notify->srvid) {
1268                         break;
1269                 }
1270         }
1271         if (nl == NULL) {
1272                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1273                 return -1;
1274         }
1275
1276         DLIST_REMOVE(client->notify, nl);
1277         talloc_set_destructor(nl, NULL);
1278         talloc_free(nl);
1279
1280         return 0;
1281 }
1282
1283 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1284 {
1285         struct ctdb_client_pid_list *client_pid;
1286
1287         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1288                 if (client_pid->pid == pid) {
1289                         return client_pid->client;
1290                 }
1291         }
1292         return NULL;
1293 }
1294
1295
1296 /* This control is used by samba when probing if a process (of a samba daemon)
1297    exists on the node.
1298    Samba does this when it needs/wants to check if a subrecord in one of the
1299    databases is still valied, or if it is stale and can be removed.
1300    If the node is in unhealthy or stopped state we just kill of the samba
1301    process holding htis sub-record and return to the calling samba that
1302    the process does not exist.
1303    This allows us to forcefully recall subrecords registered by samba processes
1304    on banned and stopped nodes.
1305 */
1306 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1307 {
1308         struct ctdb_client *client;
1309
1310         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1311                 client = ctdb_find_client_by_pid(ctdb, pid);
1312                 if (client != NULL) {
1313                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1314                         talloc_free(client);
1315                 }
1316                 return -1;
1317         }
1318
1319         return kill(pid, 0);
1320 }