daemon: correctly end a running trans3_commit if the client disconnects.
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207
208 /*
209   destroy a ctdb_client
210 */
211 static int ctdb_client_destructor(struct ctdb_client *client)
212 {
213         struct ctdb_db_context *ctdb_db;
214
215         ctdb_takeover_client_destructor_hook(client);
216         ctdb_reqid_remove(client->ctdb, client->client_id);
217         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
218
219         if (client->num_persistent_updates != 0) {
220                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
222         }
223         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
224         if (ctdb_db) {
225                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226                                   "commit active. Forcing recovery.\n"));
227                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
228
229                 /* legacy trans2 transaction state: */
230                 ctdb_db->transaction_active = false;
231
232                 /*
233                  * trans3 transaction state:
234                  *
235                  * The destructor sets the pointer to NULL.
236                  */
237                 talloc_free(ctdb_db->persistent_state);
238         }
239
240         return 0;
241 }
242
243
244 /*
245   this is called when the ctdb daemon received a ctdb request message
246   from a local client over the unix domain socket
247  */
248 static void daemon_request_message_from_client(struct ctdb_client *client, 
249                                                struct ctdb_req_message *c)
250 {
251         TDB_DATA data;
252         int res;
253
254         /* maybe the message is for another client on this node */
255         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
256                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
257                 return;
258         }
259
260         /* its for a remote node */
261         data.dptr = &c->data[0];
262         data.dsize = c->datalen;
263         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
264                                        c->srvid, data);
265         if (res != 0) {
266                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
267                          c->hdr.destnode));
268         }
269 }
270
271
272 struct daemon_call_state {
273         struct ctdb_client *client;
274         uint32_t reqid;
275         struct ctdb_call *call;
276         struct timeval start_time;
277 };
278
279 /* 
280    complete a call from a client 
281 */
282 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
283 {
284         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
285                                                            struct daemon_call_state);
286         struct ctdb_reply_call *r;
287         int res;
288         uint32_t length;
289         struct ctdb_client *client = dstate->client;
290         struct ctdb_db_context *ctdb_db = state->ctdb_db;
291
292         talloc_steal(client, dstate);
293         talloc_steal(dstate, dstate->call);
294
295         res = ctdb_daemon_call_recv(state, dstate->call);
296         if (res != 0) {
297                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
298                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
299
300                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
301                 return;
302         }
303
304         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
305         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
306                                length, struct ctdb_reply_call);
307         if (r == NULL) {
308                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
309                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
310                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
311                 return;
312         }
313         r->hdr.reqid        = dstate->reqid;
314         r->datalen          = dstate->call->reply_data.dsize;
315         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
316
317         res = daemon_queue_send(client, &r->hdr);
318         if (res == -1) {
319                 /* client is dead - return immediately */
320                 return;
321         }
322         if (res != 0) {
323                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
324         }
325         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
326         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
327         talloc_free(dstate);
328 }
329
330 struct ctdb_daemon_packet_wrap {
331         struct ctdb_context *ctdb;
332         uint32_t client_id;
333 };
334
335 /*
336   a wrapper to catch disconnected clients
337  */
338 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
339 {
340         struct ctdb_client *client;
341         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
342                                                             struct ctdb_daemon_packet_wrap);
343         if (w == NULL) {
344                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
345                 return;
346         }
347
348         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
349         if (client == NULL) {
350                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
351                          w->client_id));
352                 talloc_free(w);
353                 return;
354         }
355         talloc_free(w);
356
357         /* process it */
358         daemon_incoming_packet(client, hdr);    
359 }
360
361
362 /*
363   this is called when the ctdb daemon received a ctdb request call
364   from a local client over the unix domain socket
365  */
366 static void daemon_request_call_from_client(struct ctdb_client *client, 
367                                             struct ctdb_req_call *c)
368 {
369         struct ctdb_call_state *state;
370         struct ctdb_db_context *ctdb_db;
371         struct daemon_call_state *dstate;
372         struct ctdb_call *call;
373         struct ctdb_ltdb_header header;
374         TDB_DATA key, data;
375         int ret;
376         struct ctdb_context *ctdb = client->ctdb;
377         struct ctdb_daemon_packet_wrap *w;
378
379         CTDB_INCREMENT_STAT(ctdb, total_calls);
380         CTDB_DECREMENT_STAT(ctdb, pending_calls);
381
382         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
383         if (!ctdb_db) {
384                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
385                           c->db_id));
386                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
387                 return;
388         }
389
390         if (ctdb_db->unhealthy_reason) {
391                 /*
392                  * this is just a warning, as the tdb should be empty anyway,
393                  * and only persistent databases can be unhealthy, which doesn't
394                  * use this code patch
395                  */
396                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
397                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
398         }
399
400         key.dptr = c->data;
401         key.dsize = c->keylen;
402
403         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
404         CTDB_NO_MEMORY_VOID(ctdb, w);   
405
406         w->ctdb = ctdb;
407         w->client_id = client->client_id;
408
409         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
410                                            (struct ctdb_req_header *)c, &data,
411                                            daemon_incoming_packet_wrap, w, True);
412         if (ret == -2) {
413                 /* will retry later */
414                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
415                 return;
416         }
417
418         talloc_free(w);
419
420         if (ret != 0) {
421                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
422                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
423                 return;
424         }
425
426         dstate = talloc(client, struct daemon_call_state);
427         if (dstate == NULL) {
428                 ret = ctdb_ltdb_unlock(ctdb_db, key);
429                 if (ret != 0) {
430                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
431                 }
432
433                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
434                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
435                 return;
436         }
437         dstate->start_time = timeval_current();
438         dstate->client = client;
439         dstate->reqid  = c->hdr.reqid;
440         talloc_steal(dstate, data.dptr);
441
442         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
443         if (call == NULL) {
444                 ret = ctdb_ltdb_unlock(ctdb_db, key);
445                 if (ret != 0) {
446                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
447                 }
448
449                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
450                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
451                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
452                 return;
453         }
454
455         call->call_id = c->callid;
456         call->key = key;
457         call->call_data.dptr = c->data + c->keylen;
458         call->call_data.dsize = c->calldatalen;
459         call->flags = c->flags;
460
461         if (header.dmaster == ctdb->pnn) {
462                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
463         } else {
464                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
465         }
466
467         ret = ctdb_ltdb_unlock(ctdb_db, key);
468         if (ret != 0) {
469                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
470         }
471
472         if (state == NULL) {
473                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
474                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
475                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
476                 return;
477         }
478         talloc_steal(state, dstate);
479         talloc_steal(client, state);
480
481         state->async.fn = daemon_call_from_client_callback;
482         state->async.private_data = dstate;
483 }
484
485
486 static void daemon_request_control_from_client(struct ctdb_client *client, 
487                                                struct ctdb_req_control *c);
488
489 /* data contains a packet from the client */
490 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
491 {
492         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
493         TALLOC_CTX *tmp_ctx;
494         struct ctdb_context *ctdb = client->ctdb;
495
496         /* place the packet as a child of a tmp_ctx. We then use
497            talloc_free() below to free it. If any of the calls want
498            to keep it, then they will steal it somewhere else, and the
499            talloc_free() will be a no-op */
500         tmp_ctx = talloc_new(client);
501         talloc_steal(tmp_ctx, hdr);
502
503         if (hdr->ctdb_magic != CTDB_MAGIC) {
504                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
505                 goto done;
506         }
507
508         if (hdr->ctdb_version != CTDB_VERSION) {
509                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
510                 goto done;
511         }
512
513         switch (hdr->operation) {
514         case CTDB_REQ_CALL:
515                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
516                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
517                 break;
518
519         case CTDB_REQ_MESSAGE:
520                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
521                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
522                 break;
523
524         case CTDB_REQ_CONTROL:
525                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
526                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
527                 break;
528
529         default:
530                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
531                          hdr->operation));
532         }
533
534 done:
535         talloc_free(tmp_ctx);
536 }
537
538 /*
539   called when the daemon gets a incoming packet
540  */
541 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
542 {
543         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
544         struct ctdb_req_header *hdr;
545
546         if (cnt == 0) {
547                 talloc_free(client);
548                 return;
549         }
550
551         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
552
553         if (cnt < sizeof(*hdr)) {
554                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
555                                (unsigned)cnt);
556                 return;
557         }
558         hdr = (struct ctdb_req_header *)data;
559         if (cnt != hdr->length) {
560                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
561                                (unsigned)hdr->length, (unsigned)cnt);
562                 return;
563         }
564
565         if (hdr->ctdb_magic != CTDB_MAGIC) {
566                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
567                 return;
568         }
569
570         if (hdr->ctdb_version != CTDB_VERSION) {
571                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
572                 return;
573         }
574
575         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
576                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
577                  hdr->srcnode, hdr->destnode));
578
579         /* it is the responsibility of the incoming packet function to free 'data' */
580         daemon_incoming_packet(client, hdr);
581 }
582
583
584 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
585 {
586         if (client_pid->ctdb->client_pids != NULL) {
587                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
588         }
589
590         return 0;
591 }
592
593
594 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
595                          uint16_t flags, void *private_data)
596 {
597         struct sockaddr_un addr;
598         socklen_t len;
599         int fd;
600         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
601         struct ctdb_client *client;
602         struct ctdb_client_pid_list *client_pid;
603 #ifdef _AIX
604         struct peercred_struct cr;
605         socklen_t crl = sizeof(struct peercred_struct);
606 #else
607         struct ucred cr;
608         socklen_t crl = sizeof(struct ucred);
609 #endif
610
611         memset(&addr, 0, sizeof(addr));
612         len = sizeof(addr);
613         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
614         if (fd == -1) {
615                 return;
616         }
617
618         set_nonblocking(fd);
619         set_close_on_exec(fd);
620
621         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
622
623         client = talloc_zero(ctdb, struct ctdb_client);
624 #ifdef _AIX
625         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
626 #else
627         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
628 #endif
629                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
630         }
631
632         client->ctdb = ctdb;
633         client->fd = fd;
634         client->client_id = ctdb_reqid_new(ctdb, client);
635         client->pid = cr.pid;
636
637         client_pid = talloc(client, struct ctdb_client_pid_list);
638         if (client_pid == NULL) {
639                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
640                 close(fd);
641                 talloc_free(client);
642                 return;
643         }               
644         client_pid->ctdb   = ctdb;
645         client_pid->pid    = cr.pid;
646         client_pid->client = client;
647
648         DLIST_ADD(ctdb->client_pids, client_pid);
649
650         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
651                                          ctdb_daemon_read_cb, client,
652                                          "client-%u", client->pid);
653
654         talloc_set_destructor(client, ctdb_client_destructor);
655         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
656         CTDB_INCREMENT_STAT(ctdb, num_clients);
657 }
658
659
660
661 /*
662   create a unix domain socket and bind it
663   return a file descriptor open on the socket 
664 */
665 static int ux_socket_bind(struct ctdb_context *ctdb)
666 {
667         struct sockaddr_un addr;
668
669         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
670         if (ctdb->daemon.sd == -1) {
671                 return -1;
672         }
673
674         set_close_on_exec(ctdb->daemon.sd);
675         set_nonblocking(ctdb->daemon.sd);
676
677         memset(&addr, 0, sizeof(addr));
678         addr.sun_family = AF_UNIX;
679         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
680
681         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
682                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
683                 goto failed;
684         }       
685
686         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
687             chmod(ctdb->daemon.name, 0700) != 0) {
688                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
689                 goto failed;
690         } 
691
692
693         if (listen(ctdb->daemon.sd, 100) != 0) {
694                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
695                 goto failed;
696         }
697
698         return 0;
699
700 failed:
701         close(ctdb->daemon.sd);
702         ctdb->daemon.sd = -1;
703         return -1;      
704 }
705
706 static void sig_child_handler(struct event_context *ev,
707         struct signal_event *se, int signum, int count,
708         void *dont_care, 
709         void *private_data)
710 {
711 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
712         int status;
713         pid_t pid = -1;
714
715         while (pid != 0) {
716                 pid = waitpid(-1, &status, WNOHANG);
717                 if (pid == -1) {
718                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
719                         return;
720                 }
721                 if (pid > 0) {
722                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
723                 }
724         }
725 }
726
727 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
728                                       void *private_data)
729 {
730         if (status != 0) {
731                 ctdb_fatal(ctdb, "Failed to run setup event\n");
732                 return;
733         }
734         ctdb_run_notification_script(ctdb, "setup");
735
736         /* tell all other nodes we've just started up */
737         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
738                                  0, CTDB_CONTROL_STARTUP, 0,
739                                  CTDB_CTRL_FLAG_NOREPLY,
740                                  tdb_null, NULL, NULL);
741 }
742
743 /*
744   start the protocol going as a daemon
745 */
746 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
747 {
748         int res, ret = -1;
749         struct fd_event *fde;
750         const char *domain_socket_name;
751         struct signal_event *se;
752
753         /* get rid of any old sockets */
754         unlink(ctdb->daemon.name);
755
756         /* create a unix domain stream socket to listen to */
757         res = ux_socket_bind(ctdb);
758         if (res!=0) {
759                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
760                 exit(10);
761         }
762
763         if (do_fork && fork()) {
764                 return 0;
765         }
766
767         tdb_reopen_all(False);
768
769         if (do_fork) {
770                 setsid();
771                 close(0);
772                 if (open("/dev/null", O_RDONLY) != 0) {
773                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
774                         exit(11);
775                 }
776         }
777         block_signal(SIGPIPE);
778
779         ctdbd_pid = getpid();
780
781
782         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
783
784         if (ctdb->do_setsched) {
785                 /* try to set us up as realtime */
786                 ctdb_set_scheduler(ctdb);
787         }
788
789         /* ensure the socket is deleted on exit of the daemon */
790         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
791         if (domain_socket_name == NULL) {
792                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
793                 exit(12);
794         }
795
796         ctdb->ev = event_context_init(NULL);
797         tevent_loop_allow_nesting(ctdb->ev);
798         ret = ctdb_init_tevent_logging(ctdb);
799         if (ret != 0) {
800                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
801                 exit(1);
802         }
803
804         ctdb_set_child_logging(ctdb);
805
806         /* initialize statistics collection */
807         ctdb_statistics_init(ctdb);
808
809         /* force initial recovery for election */
810         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
811
812         if (strcmp(ctdb->transport, "tcp") == 0) {
813                 int ctdb_tcp_init(struct ctdb_context *);
814                 ret = ctdb_tcp_init(ctdb);
815         }
816 #ifdef USE_INFINIBAND
817         if (strcmp(ctdb->transport, "ib") == 0) {
818                 int ctdb_ibw_init(struct ctdb_context *);
819                 ret = ctdb_ibw_init(ctdb);
820         }
821 #endif
822         if (ret != 0) {
823                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
824                 return -1;
825         }
826
827         if (ctdb->methods == NULL) {
828                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
829                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
830         }
831
832         /* initialise the transport  */
833         if (ctdb->methods->initialise(ctdb) != 0) {
834                 ctdb_fatal(ctdb, "transport failed to initialise");
835         }
836         if (public_address_list) {
837                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
838                 if (ret == -1) {
839                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
840                         exit(1);
841                 }
842         }
843
844
845         /* attach to existing databases */
846         if (ctdb_attach_databases(ctdb) != 0) {
847                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
848         }
849
850         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
851         if (ret != 0) {
852                 ctdb_fatal(ctdb, "Failed to run init event\n");
853         }
854         ctdb_run_notification_script(ctdb, "init");
855
856         /* start frozen, then let the first election sort things out */
857         if (ctdb_blocking_freeze(ctdb)) {
858                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
859         }
860
861         /* now start accepting clients, only can do this once frozen */
862         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
863                            EVENT_FD_READ,
864                            ctdb_accept_client, ctdb);
865         tevent_fd_set_auto_close(fde);
866
867         /* release any IPs we hold from previous runs of the daemon */
868         if (ctdb->tunable.disable_ip_failover == 0) {
869                 ctdb_release_all_ips(ctdb);
870         }
871
872         /* start the transport going */
873         ctdb_start_transport(ctdb);
874
875         /* set up a handler to pick up sigchld */
876         se = event_add_signal(ctdb->ev, ctdb,
877                                      SIGCHLD, 0,
878                                      sig_child_handler,
879                                      ctdb);
880         if (se == NULL) {
881                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
882                 exit(1);
883         }
884
885         ret = ctdb_event_script_callback(ctdb,
886                                          ctdb,
887                                          ctdb_setup_event_callback,
888                                          ctdb,
889                                          false,
890                                          CTDB_EVENT_SETUP,
891                                          "");
892         if (ret != 0) {
893                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
894                 exit(1);
895         }
896
897         if (use_syslog) {
898                 if (start_syslog_daemon(ctdb)) {
899                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
900                         exit(10);
901                 }
902         }
903
904         ctdb_lockdown_memory(ctdb);
905           
906         /* go into a wait loop to allow other nodes to complete */
907         event_loop_wait(ctdb->ev);
908
909         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
910         exit(1);
911 }
912
913 /*
914   allocate a packet for use in daemon<->daemon communication
915  */
916 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
917                                                  TALLOC_CTX *mem_ctx, 
918                                                  enum ctdb_operation operation, 
919                                                  size_t length, size_t slength,
920                                                  const char *type)
921 {
922         int size;
923         struct ctdb_req_header *hdr;
924
925         length = MAX(length, slength);
926         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
927
928         if (ctdb->methods == NULL) {
929                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
930                          operation, (unsigned)length));
931                 return NULL;
932         }
933
934         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
935         if (hdr == NULL) {
936                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
937                          operation, (unsigned)length));
938                 return NULL;
939         }
940         talloc_set_name_const(hdr, type);
941         memset(hdr, 0, slength);
942         hdr->length       = length;
943         hdr->operation    = operation;
944         hdr->ctdb_magic   = CTDB_MAGIC;
945         hdr->ctdb_version = CTDB_VERSION;
946         hdr->generation   = ctdb->vnn_map->generation;
947         hdr->srcnode      = ctdb->pnn;
948
949         return hdr;     
950 }
951
952 struct daemon_control_state {
953         struct daemon_control_state *next, *prev;
954         struct ctdb_client *client;
955         struct ctdb_req_control *c;
956         uint32_t reqid;
957         struct ctdb_node *node;
958 };
959
960 /*
961   callback when a control reply comes in
962  */
963 static void daemon_control_callback(struct ctdb_context *ctdb,
964                                     int32_t status, TDB_DATA data, 
965                                     const char *errormsg,
966                                     void *private_data)
967 {
968         struct daemon_control_state *state = talloc_get_type(private_data, 
969                                                              struct daemon_control_state);
970         struct ctdb_client *client = state->client;
971         struct ctdb_reply_control *r;
972         size_t len;
973         int ret;
974
975         /* construct a message to send to the client containing the data */
976         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
977         if (errormsg) {
978                 len += strlen(errormsg);
979         }
980         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
981                                struct ctdb_reply_control);
982         CTDB_NO_MEMORY_VOID(ctdb, r);
983
984         r->hdr.reqid     = state->reqid;
985         r->status        = status;
986         r->datalen       = data.dsize;
987         r->errorlen = 0;
988         memcpy(&r->data[0], data.dptr, data.dsize);
989         if (errormsg) {
990                 r->errorlen = strlen(errormsg);
991                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
992         }
993
994         ret = daemon_queue_send(client, &r->hdr);
995         if (ret != -1) {
996                 talloc_free(state);
997         }
998 }
999
1000 /*
1001   fail all pending controls to a disconnected node
1002  */
1003 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1004 {
1005         struct daemon_control_state *state;
1006         while ((state = node->pending_controls)) {
1007                 DLIST_REMOVE(node->pending_controls, state);
1008                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1009                                         "node is disconnected", state);
1010         }
1011 }
1012
1013 /*
1014   destroy a daemon_control_state
1015  */
1016 static int daemon_control_destructor(struct daemon_control_state *state)
1017 {
1018         if (state->node) {
1019                 DLIST_REMOVE(state->node->pending_controls, state);
1020         }
1021         return 0;
1022 }
1023
1024 /*
1025   this is called when the ctdb daemon received a ctdb request control
1026   from a local client over the unix domain socket
1027  */
1028 static void daemon_request_control_from_client(struct ctdb_client *client, 
1029                                                struct ctdb_req_control *c)
1030 {
1031         TDB_DATA data;
1032         int res;
1033         struct daemon_control_state *state;
1034         TALLOC_CTX *tmp_ctx = talloc_new(client);
1035
1036         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1037                 c->hdr.destnode = client->ctdb->pnn;
1038         }
1039
1040         state = talloc(client, struct daemon_control_state);
1041         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1042
1043         state->client = client;
1044         state->c = talloc_steal(state, c);
1045         state->reqid = c->hdr.reqid;
1046         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1047                 state->node = client->ctdb->nodes[c->hdr.destnode];
1048                 DLIST_ADD(state->node->pending_controls, state);
1049         } else {
1050                 state->node = NULL;
1051         }
1052
1053         talloc_set_destructor(state, daemon_control_destructor);
1054
1055         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1056                 talloc_steal(tmp_ctx, state);
1057         }
1058         
1059         data.dptr = &c->data[0];
1060         data.dsize = c->datalen;
1061         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1062                                        c->srvid, c->opcode, client->client_id,
1063                                        c->flags,
1064                                        data, daemon_control_callback,
1065                                        state);
1066         if (res != 0) {
1067                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1068                          c->hdr.destnode));
1069         }
1070
1071         talloc_free(tmp_ctx);
1072 }
1073
1074 /*
1075   register a call function
1076 */
1077 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1078                          ctdb_fn_t fn, int id)
1079 {
1080         struct ctdb_registered_call *call;
1081         struct ctdb_db_context *ctdb_db;
1082
1083         ctdb_db = find_ctdb_db(ctdb, db_id);
1084         if (ctdb_db == NULL) {
1085                 return -1;
1086         }
1087
1088         call = talloc(ctdb_db, struct ctdb_registered_call);
1089         call->fn = fn;
1090         call->id = id;
1091
1092         DLIST_ADD(ctdb_db->calls, call);        
1093         return 0;
1094 }
1095
1096
1097
1098 /*
1099   this local messaging handler is ugly, but is needed to prevent
1100   recursion in ctdb_send_message() when the destination node is the
1101   same as the source node
1102  */
1103 struct ctdb_local_message {
1104         struct ctdb_context *ctdb;
1105         uint64_t srvid;
1106         TDB_DATA data;
1107 };
1108
1109 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1110                                        struct timeval t, void *private_data)
1111 {
1112         struct ctdb_local_message *m = talloc_get_type(private_data, 
1113                                                        struct ctdb_local_message);
1114         int res;
1115
1116         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1117         if (res != 0) {
1118                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1119                           (unsigned long long)m->srvid));
1120         }
1121         talloc_free(m);
1122 }
1123
1124 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1125 {
1126         struct ctdb_local_message *m;
1127         m = talloc(ctdb, struct ctdb_local_message);
1128         CTDB_NO_MEMORY(ctdb, m);
1129
1130         m->ctdb = ctdb;
1131         m->srvid = srvid;
1132         m->data  = data;
1133         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1134         if (m->data.dptr == NULL) {
1135                 talloc_free(m);
1136                 return -1;
1137         }
1138
1139         /* this needs to be done as an event to prevent recursion */
1140         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1141         return 0;
1142 }
1143
1144 /*
1145   send a ctdb message
1146 */
1147 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1148                              uint64_t srvid, TDB_DATA data)
1149 {
1150         struct ctdb_req_message *r;
1151         int len;
1152
1153         if (ctdb->methods == NULL) {
1154                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1155                 return -1;
1156         }
1157
1158         /* see if this is a message to ourselves */
1159         if (pnn == ctdb->pnn) {
1160                 return ctdb_local_message(ctdb, srvid, data);
1161         }
1162
1163         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1164         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1165                                     struct ctdb_req_message);
1166         CTDB_NO_MEMORY(ctdb, r);
1167
1168         r->hdr.destnode  = pnn;
1169         r->srvid         = srvid;
1170         r->datalen       = data.dsize;
1171         memcpy(&r->data[0], data.dptr, data.dsize);
1172
1173         ctdb_queue_packet(ctdb, &r->hdr);
1174
1175         talloc_free(r);
1176         return 0;
1177 }
1178
1179
1180
1181 struct ctdb_client_notify_list {
1182         struct ctdb_client_notify_list *next, *prev;
1183         struct ctdb_context *ctdb;
1184         uint64_t srvid;
1185         TDB_DATA data;
1186 };
1187
1188
1189 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1190 {
1191         int ret;
1192
1193         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1194
1195         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1196         if (ret != 0) {
1197                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1198         }
1199
1200         return 0;
1201 }
1202
1203 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1204 {
1205         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1206         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1207         struct ctdb_client_notify_list *nl;
1208
1209         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1210
1211         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1212                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1213                 return -1;
1214         }
1215
1216         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1217                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1218                 return -1;
1219         }
1220
1221
1222         if (client == NULL) {
1223                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1224                 return -1;
1225         }
1226
1227         for(nl=client->notify; nl; nl=nl->next) {
1228                 if (nl->srvid == notify->srvid) {
1229                         break;
1230                 }
1231         }
1232         if (nl != NULL) {
1233                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1234                 return -1;
1235         }
1236
1237         nl = talloc(client, struct ctdb_client_notify_list);
1238         CTDB_NO_MEMORY(ctdb, nl);
1239         nl->ctdb       = ctdb;
1240         nl->srvid      = notify->srvid;
1241         nl->data.dsize = notify->len;
1242         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1243         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1244         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1245         
1246         DLIST_ADD(client->notify, nl);
1247         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1248
1249         return 0;
1250 }
1251
1252 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1253 {
1254         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1255         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1256         struct ctdb_client_notify_list *nl;
1257
1258         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1259
1260         if (client == NULL) {
1261                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1262                 return -1;
1263         }
1264
1265         for(nl=client->notify; nl; nl=nl->next) {
1266                 if (nl->srvid == notify->srvid) {
1267                         break;
1268                 }
1269         }
1270         if (nl == NULL) {
1271                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1272                 return -1;
1273         }
1274
1275         DLIST_REMOVE(client->notify, nl);
1276         talloc_set_destructor(nl, NULL);
1277         talloc_free(nl);
1278
1279         return 0;
1280 }
1281
1282 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1283 {
1284         struct ctdb_client_pid_list *client_pid;
1285
1286         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1287                 if (client_pid->pid == pid) {
1288                         return client_pid->client;
1289                 }
1290         }
1291         return NULL;
1292 }
1293
1294
1295 /* This control is used by samba when probing if a process (of a samba daemon)
1296    exists on the node.
1297    Samba does this when it needs/wants to check if a subrecord in one of the
1298    databases is still valied, or if it is stale and can be removed.
1299    If the node is in unhealthy or stopped state we just kill of the samba
1300    process holding htis sub-record and return to the calling samba that
1301    the process does not exist.
1302    This allows us to forcefully recall subrecords registered by samba processes
1303    on banned and stopped nodes.
1304 */
1305 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1306 {
1307         struct ctdb_client *client;
1308
1309         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1310                 client = ctdb_find_client_by_pid(ctdb, pid);
1311                 if (client != NULL) {
1312                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1313                         talloc_free(client);
1314                 }
1315                 return -1;
1316         }
1317
1318         return kill(pid, 0);
1319 }