362f1cee5768a4fc098c59b36b3d25bb7973af07
[sahlberg/ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207
208 /*
209   destroy a ctdb_client
210 */
211 static int ctdb_client_destructor(struct ctdb_client *client)
212 {
213         struct ctdb_db_context *ctdb_db;
214
215         ctdb_takeover_client_destructor_hook(client);
216         ctdb_reqid_remove(client->ctdb, client->client_id);
217         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
218
219         if (client->num_persistent_updates != 0) {
220                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
221                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
222         }
223         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
224         if (ctdb_db) {
225                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
226                                   "commit active. Forcing recovery.\n"));
227                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
228                 ctdb_db->transaction_active = false;
229         }
230
231         return 0;
232 }
233
234
235 /*
236   this is called when the ctdb daemon received a ctdb request message
237   from a local client over the unix domain socket
238  */
239 static void daemon_request_message_from_client(struct ctdb_client *client, 
240                                                struct ctdb_req_message *c)
241 {
242         TDB_DATA data;
243         int res;
244
245         /* maybe the message is for another client on this node */
246         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
247                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
248                 return;
249         }
250
251         /* its for a remote node */
252         data.dptr = &c->data[0];
253         data.dsize = c->datalen;
254         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
255                                        c->srvid, data);
256         if (res != 0) {
257                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
258                          c->hdr.destnode));
259         }
260 }
261
262
263 struct daemon_call_state {
264         struct ctdb_client *client;
265         uint32_t reqid;
266         struct ctdb_call *call;
267         struct timeval start_time;
268 };
269
270 /* 
271    complete a call from a client 
272 */
273 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
274 {
275         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
276                                                            struct daemon_call_state);
277         struct ctdb_reply_call *r;
278         int res;
279         uint32_t length;
280         struct ctdb_client *client = dstate->client;
281         struct ctdb_db_context *ctdb_db = state->ctdb_db;
282
283         talloc_steal(client, dstate);
284         talloc_steal(dstate, dstate->call);
285
286         res = ctdb_daemon_call_recv(state, dstate->call);
287         if (res != 0) {
288                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
289                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
290
291                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
292                 return;
293         }
294
295         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
296         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
297                                length, struct ctdb_reply_call);
298         if (r == NULL) {
299                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
300                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
301                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
302                 return;
303         }
304         r->hdr.reqid        = dstate->reqid;
305         r->datalen          = dstate->call->reply_data.dsize;
306         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
307
308         res = daemon_queue_send(client, &r->hdr);
309         if (res == -1) {
310                 /* client is dead - return immediately */
311                 return;
312         }
313         if (res != 0) {
314                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
315         }
316         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
317         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
318         talloc_free(dstate);
319 }
320
321 struct ctdb_daemon_packet_wrap {
322         struct ctdb_context *ctdb;
323         uint32_t client_id;
324 };
325
326 /*
327   a wrapper to catch disconnected clients
328  */
329 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
330 {
331         struct ctdb_client *client;
332         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
333                                                             struct ctdb_daemon_packet_wrap);
334         if (w == NULL) {
335                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
336                 return;
337         }
338
339         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
340         if (client == NULL) {
341                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
342                          w->client_id));
343                 talloc_free(w);
344                 return;
345         }
346         talloc_free(w);
347
348         /* process it */
349         daemon_incoming_packet(client, hdr);    
350 }
351
352
353 /*
354   this is called when the ctdb daemon received a ctdb request call
355   from a local client over the unix domain socket
356  */
357 static void daemon_request_call_from_client(struct ctdb_client *client, 
358                                             struct ctdb_req_call *c)
359 {
360         struct ctdb_call_state *state;
361         struct ctdb_db_context *ctdb_db;
362         struct daemon_call_state *dstate;
363         struct ctdb_call *call;
364         struct ctdb_ltdb_header header;
365         TDB_DATA key, data;
366         int ret;
367         struct ctdb_context *ctdb = client->ctdb;
368         struct ctdb_daemon_packet_wrap *w;
369
370         CTDB_INCREMENT_STAT(ctdb, total_calls);
371         CTDB_DECREMENT_STAT(ctdb, pending_calls);
372
373         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
374         if (!ctdb_db) {
375                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
376                           c->db_id));
377                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
378                 return;
379         }
380
381         if (ctdb_db->unhealthy_reason) {
382                 /*
383                  * this is just a warning, as the tdb should be empty anyway,
384                  * and only persistent databases can be unhealthy, which doesn't
385                  * use this code patch
386                  */
387                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
388                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
389         }
390
391         key.dptr = c->data;
392         key.dsize = c->keylen;
393
394         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
395         CTDB_NO_MEMORY_VOID(ctdb, w);   
396
397         w->ctdb = ctdb;
398         w->client_id = client->client_id;
399
400         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
401                                            (struct ctdb_req_header *)c, &data,
402                                            daemon_incoming_packet_wrap, w, True);
403         if (ret == -2) {
404                 /* will retry later */
405                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
406                 return;
407         }
408
409         talloc_free(w);
410
411         if (ret != 0) {
412                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
413                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
414                 return;
415         }
416
417         dstate = talloc(client, struct daemon_call_state);
418         if (dstate == NULL) {
419                 ret = ctdb_ltdb_unlock(ctdb_db, key);
420                 if (ret != 0) {
421                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
422                 }
423
424                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
425                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
426                 return;
427         }
428         dstate->start_time = timeval_current();
429         dstate->client = client;
430         dstate->reqid  = c->hdr.reqid;
431         talloc_steal(dstate, data.dptr);
432
433         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
434         if (call == NULL) {
435                 ret = ctdb_ltdb_unlock(ctdb_db, key);
436                 if (ret != 0) {
437                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
438                 }
439
440                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
441                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
442                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
443                 return;
444         }
445
446         call->call_id = c->callid;
447         call->key = key;
448         call->call_data.dptr = c->data + c->keylen;
449         call->call_data.dsize = c->calldatalen;
450         call->flags = c->flags;
451
452         if (header.dmaster == ctdb->pnn) {
453                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
454         } else {
455                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
456         }
457
458         ret = ctdb_ltdb_unlock(ctdb_db, key);
459         if (ret != 0) {
460                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
461         }
462
463         if (state == NULL) {
464                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
465                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
466                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
467                 return;
468         }
469         talloc_steal(state, dstate);
470         talloc_steal(client, state);
471
472         state->async.fn = daemon_call_from_client_callback;
473         state->async.private_data = dstate;
474 }
475
476
477 static void daemon_request_control_from_client(struct ctdb_client *client, 
478                                                struct ctdb_req_control *c);
479
480 /* data contains a packet from the client */
481 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
482 {
483         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
484         TALLOC_CTX *tmp_ctx;
485         struct ctdb_context *ctdb = client->ctdb;
486
487         /* place the packet as a child of a tmp_ctx. We then use
488            talloc_free() below to free it. If any of the calls want
489            to keep it, then they will steal it somewhere else, and the
490            talloc_free() will be a no-op */
491         tmp_ctx = talloc_new(client);
492         talloc_steal(tmp_ctx, hdr);
493
494         if (hdr->ctdb_magic != CTDB_MAGIC) {
495                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
496                 goto done;
497         }
498
499         if (hdr->ctdb_version != CTDB_VERSION) {
500                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
501                 goto done;
502         }
503
504         switch (hdr->operation) {
505         case CTDB_REQ_CALL:
506                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
507                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
508                 break;
509
510         case CTDB_REQ_MESSAGE:
511                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
512                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
513                 break;
514
515         case CTDB_REQ_CONTROL:
516                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
517                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
518                 break;
519
520         default:
521                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
522                          hdr->operation));
523         }
524
525 done:
526         talloc_free(tmp_ctx);
527 }
528
529 /*
530   called when the daemon gets a incoming packet
531  */
532 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
533 {
534         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
535         struct ctdb_req_header *hdr;
536
537         if (cnt == 0) {
538                 talloc_free(client);
539                 return;
540         }
541
542         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
543
544         if (cnt < sizeof(*hdr)) {
545                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
546                                (unsigned)cnt);
547                 return;
548         }
549         hdr = (struct ctdb_req_header *)data;
550         if (cnt != hdr->length) {
551                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
552                                (unsigned)hdr->length, (unsigned)cnt);
553                 return;
554         }
555
556         if (hdr->ctdb_magic != CTDB_MAGIC) {
557                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
558                 return;
559         }
560
561         if (hdr->ctdb_version != CTDB_VERSION) {
562                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
563                 return;
564         }
565
566         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
567                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
568                  hdr->srcnode, hdr->destnode));
569
570         /* it is the responsibility of the incoming packet function to free 'data' */
571         daemon_incoming_packet(client, hdr);
572 }
573
574
575 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
576 {
577         if (client_pid->ctdb->client_pids != NULL) {
578                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
579         }
580
581         return 0;
582 }
583
584
585 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
586                          uint16_t flags, void *private_data)
587 {
588         struct sockaddr_un addr;
589         socklen_t len;
590         int fd;
591         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
592         struct ctdb_client *client;
593         struct ctdb_client_pid_list *client_pid;
594 #ifdef _AIX
595         struct peercred_struct cr;
596         socklen_t crl = sizeof(struct peercred_struct);
597 #else
598         struct ucred cr;
599         socklen_t crl = sizeof(struct ucred);
600 #endif
601
602         memset(&addr, 0, sizeof(addr));
603         len = sizeof(addr);
604         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
605         if (fd == -1) {
606                 return;
607         }
608
609         set_nonblocking(fd);
610         set_close_on_exec(fd);
611
612         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
613
614         client = talloc_zero(ctdb, struct ctdb_client);
615 #ifdef _AIX
616         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
617 #else
618         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
619 #endif
620                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
621         }
622
623         client->ctdb = ctdb;
624         client->fd = fd;
625         client->client_id = ctdb_reqid_new(ctdb, client);
626         client->pid = cr.pid;
627
628         client_pid = talloc(client, struct ctdb_client_pid_list);
629         if (client_pid == NULL) {
630                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
631                 close(fd);
632                 talloc_free(client);
633                 return;
634         }               
635         client_pid->ctdb   = ctdb;
636         client_pid->pid    = cr.pid;
637         client_pid->client = client;
638
639         DLIST_ADD(ctdb->client_pids, client_pid);
640
641         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
642                                          ctdb_daemon_read_cb, client,
643                                          "client-%u", client->pid);
644
645         talloc_set_destructor(client, ctdb_client_destructor);
646         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
647         CTDB_INCREMENT_STAT(ctdb, num_clients);
648 }
649
650
651
652 /*
653   create a unix domain socket and bind it
654   return a file descriptor open on the socket 
655 */
656 static int ux_socket_bind(struct ctdb_context *ctdb)
657 {
658         struct sockaddr_un addr;
659
660         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
661         if (ctdb->daemon.sd == -1) {
662                 return -1;
663         }
664
665         set_close_on_exec(ctdb->daemon.sd);
666         set_nonblocking(ctdb->daemon.sd);
667
668         memset(&addr, 0, sizeof(addr));
669         addr.sun_family = AF_UNIX;
670         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
671
672         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
673                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
674                 goto failed;
675         }       
676
677         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
678             chmod(ctdb->daemon.name, 0700) != 0) {
679                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
680                 goto failed;
681         } 
682
683
684         if (listen(ctdb->daemon.sd, 100) != 0) {
685                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
686                 goto failed;
687         }
688
689         return 0;
690
691 failed:
692         close(ctdb->daemon.sd);
693         ctdb->daemon.sd = -1;
694         return -1;      
695 }
696
697 static void sig_child_handler(struct event_context *ev,
698         struct signal_event *se, int signum, int count,
699         void *dont_care, 
700         void *private_data)
701 {
702 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
703         int status;
704         pid_t pid = -1;
705
706         while (pid != 0) {
707                 pid = waitpid(-1, &status, WNOHANG);
708                 if (pid == -1) {
709                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
710                         return;
711                 }
712                 if (pid > 0) {
713                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
714                 }
715         }
716 }
717
718 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
719                                       void *private_data)
720 {
721         if (status != 0) {
722                 ctdb_fatal(ctdb, "Failed to run setup event\n");
723                 return;
724         }
725         ctdb_run_notification_script(ctdb, "setup");
726
727         /* tell all other nodes we've just started up */
728         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
729                                  0, CTDB_CONTROL_STARTUP, 0,
730                                  CTDB_CTRL_FLAG_NOREPLY,
731                                  tdb_null, NULL, NULL);
732 }
733
734 /*
735   start the protocol going as a daemon
736 */
737 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
738 {
739         int res, ret = -1;
740         struct fd_event *fde;
741         const char *domain_socket_name;
742         struct signal_event *se;
743
744         /* get rid of any old sockets */
745         unlink(ctdb->daemon.name);
746
747         /* create a unix domain stream socket to listen to */
748         res = ux_socket_bind(ctdb);
749         if (res!=0) {
750                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
751                 exit(10);
752         }
753
754         if (do_fork && fork()) {
755                 return 0;
756         }
757
758         tdb_reopen_all(False);
759
760         if (do_fork) {
761                 setsid();
762                 close(0);
763                 if (open("/dev/null", O_RDONLY) != 0) {
764                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
765                         exit(11);
766                 }
767         }
768         block_signal(SIGPIPE);
769
770         ctdbd_pid = getpid();
771
772
773         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdbd_pid));
774
775         if (ctdb->do_setsched) {
776                 /* try to set us up as realtime */
777                 ctdb_set_scheduler(ctdb);
778         }
779
780         /* ensure the socket is deleted on exit of the daemon */
781         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
782         if (domain_socket_name == NULL) {
783                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
784                 exit(12);
785         }
786
787         ctdb->ev = event_context_init(NULL);
788         tevent_loop_allow_nesting(ctdb->ev);
789         ret = ctdb_init_tevent_logging(ctdb);
790         if (ret != 0) {
791                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
792                 exit(1);
793         }
794
795         ctdb_set_child_logging(ctdb);
796
797         /* initialize statistics collection */
798         ctdb_statistics_init(ctdb);
799
800         /* force initial recovery for election */
801         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
802
803         if (strcmp(ctdb->transport, "tcp") == 0) {
804                 int ctdb_tcp_init(struct ctdb_context *);
805                 ret = ctdb_tcp_init(ctdb);
806         }
807 #ifdef USE_INFINIBAND
808         if (strcmp(ctdb->transport, "ib") == 0) {
809                 int ctdb_ibw_init(struct ctdb_context *);
810                 ret = ctdb_ibw_init(ctdb);
811         }
812 #endif
813         if (ret != 0) {
814                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
815                 return -1;
816         }
817
818         if (ctdb->methods == NULL) {
819                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
820                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
821         }
822
823         /* initialise the transport  */
824         if (ctdb->methods->initialise(ctdb) != 0) {
825                 ctdb_fatal(ctdb, "transport failed to initialise");
826         }
827         if (public_address_list) {
828                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
829                 if (ret == -1) {
830                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
831                         exit(1);
832                 }
833         }
834
835
836         /* attach to existing databases */
837         if (ctdb_attach_databases(ctdb) != 0) {
838                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
839         }
840
841         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
842         if (ret != 0) {
843                 ctdb_fatal(ctdb, "Failed to run init event\n");
844         }
845         ctdb_run_notification_script(ctdb, "init");
846
847         /* start frozen, then let the first election sort things out */
848         if (ctdb_blocking_freeze(ctdb)) {
849                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
850         }
851
852         /* now start accepting clients, only can do this once frozen */
853         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
854                            EVENT_FD_READ,
855                            ctdb_accept_client, ctdb);
856         tevent_fd_set_auto_close(fde);
857
858         /* release any IPs we hold from previous runs of the daemon */
859         if (ctdb->tunable.disable_ip_failover == 0) {
860                 ctdb_release_all_ips(ctdb);
861         }
862
863         /* start the transport going */
864         ctdb_start_transport(ctdb);
865
866         /* set up a handler to pick up sigchld */
867         se = event_add_signal(ctdb->ev, ctdb,
868                                      SIGCHLD, 0,
869                                      sig_child_handler,
870                                      ctdb);
871         if (se == NULL) {
872                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
873                 exit(1);
874         }
875
876         ret = ctdb_event_script_callback(ctdb,
877                                          ctdb,
878                                          ctdb_setup_event_callback,
879                                          ctdb,
880                                          false,
881                                          CTDB_EVENT_SETUP,
882                                          "");
883         if (ret != 0) {
884                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
885                 exit(1);
886         }
887
888         if (use_syslog) {
889                 if (start_syslog_daemon(ctdb)) {
890                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
891                         exit(10);
892                 }
893         }
894
895         ctdb_lockdown_memory(ctdb);
896           
897         /* go into a wait loop to allow other nodes to complete */
898         event_loop_wait(ctdb->ev);
899
900         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
901         exit(1);
902 }
903
904 /*
905   allocate a packet for use in daemon<->daemon communication
906  */
907 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
908                                                  TALLOC_CTX *mem_ctx, 
909                                                  enum ctdb_operation operation, 
910                                                  size_t length, size_t slength,
911                                                  const char *type)
912 {
913         int size;
914         struct ctdb_req_header *hdr;
915
916         length = MAX(length, slength);
917         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
918
919         if (ctdb->methods == NULL) {
920                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
921                          operation, (unsigned)length));
922                 return NULL;
923         }
924
925         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
926         if (hdr == NULL) {
927                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
928                          operation, (unsigned)length));
929                 return NULL;
930         }
931         talloc_set_name_const(hdr, type);
932         memset(hdr, 0, slength);
933         hdr->length       = length;
934         hdr->operation    = operation;
935         hdr->ctdb_magic   = CTDB_MAGIC;
936         hdr->ctdb_version = CTDB_VERSION;
937         hdr->generation   = ctdb->vnn_map->generation;
938         hdr->srcnode      = ctdb->pnn;
939
940         return hdr;     
941 }
942
943 struct daemon_control_state {
944         struct daemon_control_state *next, *prev;
945         struct ctdb_client *client;
946         struct ctdb_req_control *c;
947         uint32_t reqid;
948         struct ctdb_node *node;
949 };
950
951 /*
952   callback when a control reply comes in
953  */
954 static void daemon_control_callback(struct ctdb_context *ctdb,
955                                     int32_t status, TDB_DATA data, 
956                                     const char *errormsg,
957                                     void *private_data)
958 {
959         struct daemon_control_state *state = talloc_get_type(private_data, 
960                                                              struct daemon_control_state);
961         struct ctdb_client *client = state->client;
962         struct ctdb_reply_control *r;
963         size_t len;
964         int ret;
965
966         /* construct a message to send to the client containing the data */
967         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
968         if (errormsg) {
969                 len += strlen(errormsg);
970         }
971         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
972                                struct ctdb_reply_control);
973         CTDB_NO_MEMORY_VOID(ctdb, r);
974
975         r->hdr.reqid     = state->reqid;
976         r->status        = status;
977         r->datalen       = data.dsize;
978         r->errorlen = 0;
979         memcpy(&r->data[0], data.dptr, data.dsize);
980         if (errormsg) {
981                 r->errorlen = strlen(errormsg);
982                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
983         }
984
985         ret = daemon_queue_send(client, &r->hdr);
986         if (ret != -1) {
987                 talloc_free(state);
988         }
989 }
990
991 /*
992   fail all pending controls to a disconnected node
993  */
994 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
995 {
996         struct daemon_control_state *state;
997         while ((state = node->pending_controls)) {
998                 DLIST_REMOVE(node->pending_controls, state);
999                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1000                                         "node is disconnected", state);
1001         }
1002 }
1003
1004 /*
1005   destroy a daemon_control_state
1006  */
1007 static int daemon_control_destructor(struct daemon_control_state *state)
1008 {
1009         if (state->node) {
1010                 DLIST_REMOVE(state->node->pending_controls, state);
1011         }
1012         return 0;
1013 }
1014
1015 /*
1016   this is called when the ctdb daemon received a ctdb request control
1017   from a local client over the unix domain socket
1018  */
1019 static void daemon_request_control_from_client(struct ctdb_client *client, 
1020                                                struct ctdb_req_control *c)
1021 {
1022         TDB_DATA data;
1023         int res;
1024         struct daemon_control_state *state;
1025         TALLOC_CTX *tmp_ctx = talloc_new(client);
1026
1027         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1028                 c->hdr.destnode = client->ctdb->pnn;
1029         }
1030
1031         state = talloc(client, struct daemon_control_state);
1032         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1033
1034         state->client = client;
1035         state->c = talloc_steal(state, c);
1036         state->reqid = c->hdr.reqid;
1037         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1038                 state->node = client->ctdb->nodes[c->hdr.destnode];
1039                 DLIST_ADD(state->node->pending_controls, state);
1040         } else {
1041                 state->node = NULL;
1042         }
1043
1044         talloc_set_destructor(state, daemon_control_destructor);
1045
1046         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1047                 talloc_steal(tmp_ctx, state);
1048         }
1049         
1050         data.dptr = &c->data[0];
1051         data.dsize = c->datalen;
1052         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1053                                        c->srvid, c->opcode, client->client_id,
1054                                        c->flags,
1055                                        data, daemon_control_callback,
1056                                        state);
1057         if (res != 0) {
1058                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1059                          c->hdr.destnode));
1060         }
1061
1062         talloc_free(tmp_ctx);
1063 }
1064
1065 /*
1066   register a call function
1067 */
1068 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1069                          ctdb_fn_t fn, int id)
1070 {
1071         struct ctdb_registered_call *call;
1072         struct ctdb_db_context *ctdb_db;
1073
1074         ctdb_db = find_ctdb_db(ctdb, db_id);
1075         if (ctdb_db == NULL) {
1076                 return -1;
1077         }
1078
1079         call = talloc(ctdb_db, struct ctdb_registered_call);
1080         call->fn = fn;
1081         call->id = id;
1082
1083         DLIST_ADD(ctdb_db->calls, call);        
1084         return 0;
1085 }
1086
1087
1088
1089 /*
1090   this local messaging handler is ugly, but is needed to prevent
1091   recursion in ctdb_send_message() when the destination node is the
1092   same as the source node
1093  */
1094 struct ctdb_local_message {
1095         struct ctdb_context *ctdb;
1096         uint64_t srvid;
1097         TDB_DATA data;
1098 };
1099
1100 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1101                                        struct timeval t, void *private_data)
1102 {
1103         struct ctdb_local_message *m = talloc_get_type(private_data, 
1104                                                        struct ctdb_local_message);
1105         int res;
1106
1107         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1108         if (res != 0) {
1109                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1110                           (unsigned long long)m->srvid));
1111         }
1112         talloc_free(m);
1113 }
1114
1115 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1116 {
1117         struct ctdb_local_message *m;
1118         m = talloc(ctdb, struct ctdb_local_message);
1119         CTDB_NO_MEMORY(ctdb, m);
1120
1121         m->ctdb = ctdb;
1122         m->srvid = srvid;
1123         m->data  = data;
1124         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1125         if (m->data.dptr == NULL) {
1126                 talloc_free(m);
1127                 return -1;
1128         }
1129
1130         /* this needs to be done as an event to prevent recursion */
1131         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1132         return 0;
1133 }
1134
1135 /*
1136   send a ctdb message
1137 */
1138 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1139                              uint64_t srvid, TDB_DATA data)
1140 {
1141         struct ctdb_req_message *r;
1142         int len;
1143
1144         if (ctdb->methods == NULL) {
1145                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1146                 return -1;
1147         }
1148
1149         /* see if this is a message to ourselves */
1150         if (pnn == ctdb->pnn) {
1151                 return ctdb_local_message(ctdb, srvid, data);
1152         }
1153
1154         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1155         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1156                                     struct ctdb_req_message);
1157         CTDB_NO_MEMORY(ctdb, r);
1158
1159         r->hdr.destnode  = pnn;
1160         r->srvid         = srvid;
1161         r->datalen       = data.dsize;
1162         memcpy(&r->data[0], data.dptr, data.dsize);
1163
1164         ctdb_queue_packet(ctdb, &r->hdr);
1165
1166         talloc_free(r);
1167         return 0;
1168 }
1169
1170
1171
1172 struct ctdb_client_notify_list {
1173         struct ctdb_client_notify_list *next, *prev;
1174         struct ctdb_context *ctdb;
1175         uint64_t srvid;
1176         TDB_DATA data;
1177 };
1178
1179
1180 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1181 {
1182         int ret;
1183
1184         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1185
1186         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1187         if (ret != 0) {
1188                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1189         }
1190
1191         return 0;
1192 }
1193
1194 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1195 {
1196         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1197         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1198         struct ctdb_client_notify_list *nl;
1199
1200         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1201
1202         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1203                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1204                 return -1;
1205         }
1206
1207         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1208                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1209                 return -1;
1210         }
1211
1212
1213         if (client == NULL) {
1214                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1215                 return -1;
1216         }
1217
1218         for(nl=client->notify; nl; nl=nl->next) {
1219                 if (nl->srvid == notify->srvid) {
1220                         break;
1221                 }
1222         }
1223         if (nl != NULL) {
1224                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1225                 return -1;
1226         }
1227
1228         nl = talloc(client, struct ctdb_client_notify_list);
1229         CTDB_NO_MEMORY(ctdb, nl);
1230         nl->ctdb       = ctdb;
1231         nl->srvid      = notify->srvid;
1232         nl->data.dsize = notify->len;
1233         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1234         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1235         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1236         
1237         DLIST_ADD(client->notify, nl);
1238         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1239
1240         return 0;
1241 }
1242
1243 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1244 {
1245         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1246         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1247         struct ctdb_client_notify_list *nl;
1248
1249         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1250
1251         if (client == NULL) {
1252                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1253                 return -1;
1254         }
1255
1256         for(nl=client->notify; nl; nl=nl->next) {
1257                 if (nl->srvid == notify->srvid) {
1258                         break;
1259                 }
1260         }
1261         if (nl == NULL) {
1262                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1263                 return -1;
1264         }
1265
1266         DLIST_REMOVE(client->notify, nl);
1267         talloc_set_destructor(nl, NULL);
1268         talloc_free(nl);
1269
1270         return 0;
1271 }
1272
1273 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1274 {
1275         struct ctdb_client_pid_list *client_pid;
1276
1277         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1278                 if (client_pid->pid == pid) {
1279                         return client_pid->client;
1280                 }
1281         }
1282         return NULL;
1283 }
1284
1285
1286 /* This control is used by samba when probing if a process (of a samba daemon)
1287    exists on the node.
1288    Samba does this when it needs/wants to check if a subrecord in one of the
1289    databases is still valied, or if it is stale and can be removed.
1290    If the node is in unhealthy or stopped state we just kill of the samba
1291    process holding htis sub-record and return to the calling samba that
1292    the process does not exist.
1293    This allows us to forcefully recall subrecords registered by samba processes
1294    on banned and stopped nodes.
1295 */
1296 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1297 {
1298         struct ctdb_client *client;
1299
1300         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1301                 client = ctdb_find_client_by_pid(ctdb, pid);
1302                 if (client != NULL) {
1303                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1304                         talloc_free(client);
1305                 }
1306                 return -1;
1307         }
1308
1309         return kill(pid, 0);
1310 }