8b9f2eb2ceb8e29e078d59a8c233773ad2a4619e
[ctdb.git] / server / ctdb_daemon.c
1 /* 
2    ctdb daemon code
3
4    Copyright (C) Andrew Tridgell  2006
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10    
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15    
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "db_wrap.h"
22 #include "lib/tdb/include/tdb.h"
23 #include "lib/tevent/tevent.h"
24 #include "lib/util/dlinklist.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include "system/wait.h"
28 #include "../include/ctdb_client.h"
29 #include "../include/ctdb_private.h"
30 #include <sys/socket.h>
31
32 struct ctdb_client_pid_list {
33         struct ctdb_client_pid_list *next, *prev;
34         struct ctdb_context *ctdb;
35         pid_t pid;
36         struct ctdb_client *client;
37 };
38
39 static void daemon_incoming_packet(void *, struct ctdb_req_header *);
40
41 static void print_exit_message(void)
42 {
43         DEBUG(DEBUG_NOTICE,("CTDB daemon shutting down\n"));
44 }
45
46
47
48 static void ctdb_time_tick(struct event_context *ev, struct timed_event *te, 
49                                   struct timeval t, void *private_data)
50 {
51         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
52
53         if (getpid() != ctdb->ctdbd_pid) {
54                 return;
55         }
56
57         event_add_timed(ctdb->ev, ctdb, 
58                         timeval_current_ofs(1, 0), 
59                         ctdb_time_tick, ctdb);
60 }
61
62 /* Used to trigger a dummy event once per second, to make
63  * detection of hangs more reliable.
64  */
65 static void ctdb_start_time_tickd(struct ctdb_context *ctdb)
66 {
67         event_add_timed(ctdb->ev, ctdb, 
68                         timeval_current_ofs(1, 0), 
69                         ctdb_time_tick, ctdb);
70 }
71
72
73 /* called when the "startup" event script has finished */
74 static void ctdb_start_transport(struct ctdb_context *ctdb)
75 {
76         if (ctdb->methods == NULL) {
77                 DEBUG(DEBUG_ALERT,(__location__ " startup event finished but transport is DOWN.\n"));
78                 ctdb_fatal(ctdb, "transport is not initialized but startup completed");
79         }
80
81         /* start the transport running */
82         if (ctdb->methods->start(ctdb) != 0) {
83                 DEBUG(DEBUG_ALERT,("transport failed to start!\n"));
84                 ctdb_fatal(ctdb, "transport failed to start");
85         }
86
87         /* start the recovery daemon process */
88         if (ctdb_start_recoverd(ctdb) != 0) {
89                 DEBUG(DEBUG_ALERT,("Failed to start recovery daemon\n"));
90                 exit(11);
91         }
92
93         /* Make sure we log something when the daemon terminates */
94         atexit(print_exit_message);
95
96         /* start monitoring for connected/disconnected nodes */
97         ctdb_start_keepalive(ctdb);
98
99         /* start monitoring for node health */
100         ctdb_start_monitoring(ctdb);
101
102         /* start periodic update of tcp tickle lists */
103         ctdb_start_tcp_tickle_update(ctdb);
104
105         /* start listening for recovery daemon pings */
106         ctdb_control_recd_ping(ctdb);
107
108         /* start listening to timer ticks */
109         ctdb_start_time_tickd(ctdb);
110 }
111
112 static void block_signal(int signum)
113 {
114         struct sigaction act;
115
116         memset(&act, 0, sizeof(act));
117
118         act.sa_handler = SIG_IGN;
119         sigemptyset(&act.sa_mask);
120         sigaddset(&act.sa_mask, signum);
121         sigaction(signum, &act, NULL);
122 }
123
124
125 /*
126   send a packet to a client
127  */
128 static int daemon_queue_send(struct ctdb_client *client, struct ctdb_req_header *hdr)
129 {
130         CTDB_INCREMENT_STAT(client->ctdb, client_packets_sent);
131         if (hdr->operation == CTDB_REQ_MESSAGE) {
132                 if (ctdb_queue_length(client->queue) > client->ctdb->tunable.max_queue_depth_drop_msg) {
133                         DEBUG(DEBUG_ERR,("CTDB_REQ_MESSAGE queue full - killing client connection.\n"));
134                         talloc_free(client);
135                         return -1;
136                 }
137         }
138         return ctdb_queue_send(client->queue, (uint8_t *)hdr, hdr->length);
139 }
140
141 /*
142   message handler for when we are in daemon mode. This redirects the message
143   to the right client
144  */
145 static void daemon_message_handler(struct ctdb_context *ctdb, uint64_t srvid, 
146                                     TDB_DATA data, void *private_data)
147 {
148         struct ctdb_client *client = talloc_get_type(private_data, struct ctdb_client);
149         struct ctdb_req_message *r;
150         int len;
151
152         /* construct a message to send to the client containing the data */
153         len = offsetof(struct ctdb_req_message, data) + data.dsize;
154         r = ctdbd_allocate_pkt(ctdb, ctdb, CTDB_REQ_MESSAGE, 
155                                len, struct ctdb_req_message);
156         CTDB_NO_MEMORY_VOID(ctdb, r);
157
158         talloc_set_name_const(r, "req_message packet");
159
160         r->srvid         = srvid;
161         r->datalen       = data.dsize;
162         memcpy(&r->data[0], data.dptr, data.dsize);
163
164         daemon_queue_send(client, &r->hdr);
165
166         talloc_free(r);
167 }
168
169 /*
170   this is called when the ctdb daemon received a ctdb request to 
171   set the srvid from the client
172  */
173 int daemon_register_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
174 {
175         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
176         int res;
177         if (client == NULL) {
178                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_register_message_handler\n"));
179                 return -1;
180         }
181         res = ctdb_register_message_handler(ctdb, client, srvid, daemon_message_handler, client);
182         if (res != 0) {
183                 DEBUG(DEBUG_ERR,(__location__ " Failed to register handler %llu in daemon\n", 
184                          (unsigned long long)srvid));
185         } else {
186                 DEBUG(DEBUG_INFO,(__location__ " Registered message handler for srvid=%llu\n", 
187                          (unsigned long long)srvid));
188         }
189
190         return res;
191 }
192
193 /*
194   this is called when the ctdb daemon received a ctdb request to 
195   remove a srvid from the client
196  */
197 int daemon_deregister_message_handler(struct ctdb_context *ctdb, uint32_t client_id, uint64_t srvid)
198 {
199         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
200         if (client == NULL) {
201                 DEBUG(DEBUG_ERR,("Bad client_id in daemon_request_deregister_message_handler\n"));
202                 return -1;
203         }
204         return ctdb_deregister_message_handler(ctdb, srvid, client);
205 }
206
207 int daemon_check_srvids(struct ctdb_context *ctdb, TDB_DATA indata,
208                         TDB_DATA *outdata)
209 {
210         uint64_t *ids;
211         int i, num_ids;
212         uint8_t *results;
213
214         if ((indata.dsize % sizeof(uint64_t)) != 0) {
215                 DEBUG(DEBUG_ERR, ("Bad indata in daemon_check_srvids, "
216                                   "size=%d\n", (int)indata.dsize));
217                 return -1;
218         }
219
220         ids = (uint64_t *)indata.dptr;
221         num_ids = indata.dsize / 8;
222
223         results = talloc_zero_array(outdata, uint8_t, (num_ids+7)/8);
224         if (results == NULL) {
225                 DEBUG(DEBUG_ERR, ("talloc failed in daemon_check_srvids\n"));
226                 return -1;
227         }
228         for (i=0; i<num_ids; i++) {
229                 struct ctdb_message_list *ml;
230                 for (ml=ctdb->message_list; ml; ml=ml->next) {
231                         if (ml->srvid == ids[i]) {
232                                 break;
233                         }
234                 }
235                 if (ml != NULL) {
236                         results[i/8] |= (1 << (i%8));
237                 }
238         }
239         outdata->dptr = (uint8_t *)results;
240         outdata->dsize = talloc_get_size(results);
241         return 0;
242 }
243
244 /*
245   destroy a ctdb_client
246 */
247 static int ctdb_client_destructor(struct ctdb_client *client)
248 {
249         struct ctdb_db_context *ctdb_db;
250
251         ctdb_takeover_client_destructor_hook(client);
252         ctdb_reqid_remove(client->ctdb, client->client_id);
253         CTDB_DECREMENT_STAT(client->ctdb, num_clients);
254
255         if (client->num_persistent_updates != 0) {
256                 DEBUG(DEBUG_ERR,(__location__ " Client disconnecting with %u persistent updates in flight. Starting recovery\n", client->num_persistent_updates));
257                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
258         }
259         ctdb_db = find_ctdb_db(client->ctdb, client->db_id);
260         if (ctdb_db) {
261                 DEBUG(DEBUG_ERR, (__location__ " client exit while transaction "
262                                   "commit active. Forcing recovery.\n"));
263                 client->ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
264
265                 /* legacy trans2 transaction state: */
266                 ctdb_db->transaction_active = false;
267
268                 /*
269                  * trans3 transaction state:
270                  *
271                  * The destructor sets the pointer to NULL.
272                  */
273                 talloc_free(ctdb_db->persistent_state);
274         }
275
276         return 0;
277 }
278
279
280 /*
281   this is called when the ctdb daemon received a ctdb request message
282   from a local client over the unix domain socket
283  */
284 static void daemon_request_message_from_client(struct ctdb_client *client, 
285                                                struct ctdb_req_message *c)
286 {
287         TDB_DATA data;
288         int res;
289
290         /* maybe the message is for another client on this node */
291         if (ctdb_get_pnn(client->ctdb)==c->hdr.destnode) {
292                 ctdb_request_message(client->ctdb, (struct ctdb_req_header *)c);
293                 return;
294         }
295
296         /* its for a remote node */
297         data.dptr = &c->data[0];
298         data.dsize = c->datalen;
299         res = ctdb_daemon_send_message(client->ctdb, c->hdr.destnode,
300                                        c->srvid, data);
301         if (res != 0) {
302                 DEBUG(DEBUG_ERR,(__location__ " Failed to send message to remote node %u\n",
303                          c->hdr.destnode));
304         }
305 }
306
307
308 struct daemon_call_state {
309         struct ctdb_client *client;
310         uint32_t reqid;
311         struct ctdb_call *call;
312         struct timeval start_time;
313 };
314
315 /* 
316    complete a call from a client 
317 */
318 static void daemon_call_from_client_callback(struct ctdb_call_state *state)
319 {
320         struct daemon_call_state *dstate = talloc_get_type(state->async.private_data, 
321                                                            struct daemon_call_state);
322         struct ctdb_reply_call *r;
323         int res;
324         uint32_t length;
325         struct ctdb_client *client = dstate->client;
326         struct ctdb_db_context *ctdb_db = state->ctdb_db;
327
328         talloc_steal(client, dstate);
329         talloc_steal(dstate, dstate->call);
330
331         res = ctdb_daemon_call_recv(state, dstate->call);
332         if (res != 0) {
333                 DEBUG(DEBUG_ERR, (__location__ " ctdbd_call_recv() returned error\n"));
334                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
335
336                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 1", call_latency, dstate->start_time);
337                 return;
338         }
339
340         length = offsetof(struct ctdb_reply_call, data) + dstate->call->reply_data.dsize;
341         r = ctdbd_allocate_pkt(client->ctdb, dstate, CTDB_REPLY_CALL, 
342                                length, struct ctdb_reply_call);
343         if (r == NULL) {
344                 DEBUG(DEBUG_ERR, (__location__ " Failed to allocate reply_call in ctdb daemon\n"));
345                 CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
346                 CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 2", call_latency, dstate->start_time);
347                 return;
348         }
349         r->hdr.reqid        = dstate->reqid;
350         r->datalen          = dstate->call->reply_data.dsize;
351         memcpy(&r->data[0], dstate->call->reply_data.dptr, r->datalen);
352
353         res = daemon_queue_send(client, &r->hdr);
354         if (res == -1) {
355                 /* client is dead - return immediately */
356                 return;
357         }
358         if (res != 0) {
359                 DEBUG(DEBUG_ERR, (__location__ " Failed to queue packet from daemon to client\n"));
360         }
361         CTDB_UPDATE_LATENCY(client->ctdb, ctdb_db, "call_from_client_cb 3", call_latency, dstate->start_time);
362         CTDB_DECREMENT_STAT(client->ctdb, pending_calls);
363         talloc_free(dstate);
364 }
365
366 struct ctdb_daemon_packet_wrap {
367         struct ctdb_context *ctdb;
368         uint32_t client_id;
369 };
370
371 /*
372   a wrapper to catch disconnected clients
373  */
374 static void daemon_incoming_packet_wrap(void *p, struct ctdb_req_header *hdr)
375 {
376         struct ctdb_client *client;
377         struct ctdb_daemon_packet_wrap *w = talloc_get_type(p, 
378                                                             struct ctdb_daemon_packet_wrap);
379         if (w == NULL) {
380                 DEBUG(DEBUG_CRIT,(__location__ " Bad packet type '%s'\n", talloc_get_name(p)));
381                 return;
382         }
383
384         client = ctdb_reqid_find(w->ctdb, w->client_id, struct ctdb_client);
385         if (client == NULL) {
386                 DEBUG(DEBUG_ERR,(__location__ " Packet for disconnected client %u\n",
387                          w->client_id));
388                 talloc_free(w);
389                 return;
390         }
391         talloc_free(w);
392
393         /* process it */
394         daemon_incoming_packet(client, hdr);    
395 }
396
397
398 /*
399   this is called when the ctdb daemon received a ctdb request call
400   from a local client over the unix domain socket
401  */
402 static void daemon_request_call_from_client(struct ctdb_client *client, 
403                                             struct ctdb_req_call *c)
404 {
405         struct ctdb_call_state *state;
406         struct ctdb_db_context *ctdb_db;
407         struct daemon_call_state *dstate;
408         struct ctdb_call *call;
409         struct ctdb_ltdb_header header;
410         TDB_DATA key, data;
411         int ret;
412         struct ctdb_context *ctdb = client->ctdb;
413         struct ctdb_daemon_packet_wrap *w;
414
415         CTDB_INCREMENT_STAT(ctdb, total_calls);
416         CTDB_DECREMENT_STAT(ctdb, pending_calls);
417
418         ctdb_db = find_ctdb_db(client->ctdb, c->db_id);
419         if (!ctdb_db) {
420                 DEBUG(DEBUG_ERR, (__location__ " Unknown database in request. db_id==0x%08x",
421                           c->db_id));
422                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
423                 return;
424         }
425
426         if (ctdb_db->unhealthy_reason) {
427                 /*
428                  * this is just a warning, as the tdb should be empty anyway,
429                  * and only persistent databases can be unhealthy, which doesn't
430                  * use this code patch
431                  */
432                 DEBUG(DEBUG_WARNING,("warn: db(%s) unhealty in daemon_request_call_from_client(): %s\n",
433                                      ctdb_db->db_name, ctdb_db->unhealthy_reason));
434         }
435
436         key.dptr = c->data;
437         key.dsize = c->keylen;
438
439         w = talloc(ctdb, struct ctdb_daemon_packet_wrap);
440         CTDB_NO_MEMORY_VOID(ctdb, w);   
441
442         w->ctdb = ctdb;
443         w->client_id = client->client_id;
444
445         ret = ctdb_ltdb_lock_fetch_requeue(ctdb_db, key, &header, 
446                                            (struct ctdb_req_header *)c, &data,
447                                            daemon_incoming_packet_wrap, w, True);
448         if (ret == -2) {
449                 /* will retry later */
450                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
451                 return;
452         }
453
454         talloc_free(w);
455
456         if (ret != 0) {
457                 DEBUG(DEBUG_ERR,(__location__ " Unable to fetch record\n"));
458                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
459                 return;
460         }
461
462         dstate = talloc(client, struct daemon_call_state);
463         if (dstate == NULL) {
464                 ret = ctdb_ltdb_unlock(ctdb_db, key);
465                 if (ret != 0) {
466                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
467                 }
468
469                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate dstate\n"));
470                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
471                 return;
472         }
473         dstate->start_time = timeval_current();
474         dstate->client = client;
475         dstate->reqid  = c->hdr.reqid;
476         talloc_steal(dstate, data.dptr);
477
478         call = dstate->call = talloc_zero(dstate, struct ctdb_call);
479         if (call == NULL) {
480                 ret = ctdb_ltdb_unlock(ctdb_db, key);
481                 if (ret != 0) {
482                         DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
483                 }
484
485                 DEBUG(DEBUG_ERR,(__location__ " Unable to allocate call\n"));
486                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
487                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 1", call_latency, dstate->start_time);
488                 return;
489         }
490
491         call->call_id = c->callid;
492         call->key = key;
493         call->call_data.dptr = c->data + c->keylen;
494         call->call_data.dsize = c->calldatalen;
495         call->flags = c->flags;
496
497         if (header.dmaster == ctdb->pnn) {
498                 state = ctdb_call_local_send(ctdb_db, call, &header, &data);
499         } else {
500                 state = ctdb_daemon_call_send_remote(ctdb_db, call, &header);
501         }
502
503         ret = ctdb_ltdb_unlock(ctdb_db, key);
504         if (ret != 0) {
505                 DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", ret));
506         }
507
508         if (state == NULL) {
509                 DEBUG(DEBUG_ERR,(__location__ " Unable to setup call send\n"));
510                 CTDB_DECREMENT_STAT(ctdb, pending_calls);
511                 CTDB_UPDATE_LATENCY(ctdb, ctdb_db, "call_from_client 2", call_latency, dstate->start_time);
512                 return;
513         }
514         talloc_steal(state, dstate);
515         talloc_steal(client, state);
516
517         state->async.fn = daemon_call_from_client_callback;
518         state->async.private_data = dstate;
519 }
520
521
522 static void daemon_request_control_from_client(struct ctdb_client *client, 
523                                                struct ctdb_req_control *c);
524
525 /* data contains a packet from the client */
526 static void daemon_incoming_packet(void *p, struct ctdb_req_header *hdr)
527 {
528         struct ctdb_client *client = talloc_get_type(p, struct ctdb_client);
529         TALLOC_CTX *tmp_ctx;
530         struct ctdb_context *ctdb = client->ctdb;
531
532         /* place the packet as a child of a tmp_ctx. We then use
533            talloc_free() below to free it. If any of the calls want
534            to keep it, then they will steal it somewhere else, and the
535            talloc_free() will be a no-op */
536         tmp_ctx = talloc_new(client);
537         talloc_steal(tmp_ctx, hdr);
538
539         if (hdr->ctdb_magic != CTDB_MAGIC) {
540                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected in daemon\n");
541                 goto done;
542         }
543
544         if (hdr->ctdb_version != CTDB_VERSION) {
545                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
546                 goto done;
547         }
548
549         switch (hdr->operation) {
550         case CTDB_REQ_CALL:
551                 CTDB_INCREMENT_STAT(ctdb, client.req_call);
552                 daemon_request_call_from_client(client, (struct ctdb_req_call *)hdr);
553                 break;
554
555         case CTDB_REQ_MESSAGE:
556                 CTDB_INCREMENT_STAT(ctdb, client.req_message);
557                 daemon_request_message_from_client(client, (struct ctdb_req_message *)hdr);
558                 break;
559
560         case CTDB_REQ_CONTROL:
561                 CTDB_INCREMENT_STAT(ctdb, client.req_control);
562                 daemon_request_control_from_client(client, (struct ctdb_req_control *)hdr);
563                 break;
564
565         default:
566                 DEBUG(DEBUG_CRIT,(__location__ " daemon: unrecognized operation %u\n",
567                          hdr->operation));
568         }
569
570 done:
571         talloc_free(tmp_ctx);
572 }
573
574 /*
575   called when the daemon gets a incoming packet
576  */
577 static void ctdb_daemon_read_cb(uint8_t *data, size_t cnt, void *args)
578 {
579         struct ctdb_client *client = talloc_get_type(args, struct ctdb_client);
580         struct ctdb_req_header *hdr;
581
582         if (cnt == 0) {
583                 talloc_free(client);
584                 return;
585         }
586
587         CTDB_INCREMENT_STAT(client->ctdb, client_packets_recv);
588
589         if (cnt < sizeof(*hdr)) {
590                 ctdb_set_error(client->ctdb, "Bad packet length %u in daemon\n", 
591                                (unsigned)cnt);
592                 return;
593         }
594         hdr = (struct ctdb_req_header *)data;
595         if (cnt != hdr->length) {
596                 ctdb_set_error(client->ctdb, "Bad header length %u expected %u\n in daemon", 
597                                (unsigned)hdr->length, (unsigned)cnt);
598                 return;
599         }
600
601         if (hdr->ctdb_magic != CTDB_MAGIC) {
602                 ctdb_set_error(client->ctdb, "Non CTDB packet rejected\n");
603                 return;
604         }
605
606         if (hdr->ctdb_version != CTDB_VERSION) {
607                 ctdb_set_error(client->ctdb, "Bad CTDB version 0x%x rejected in daemon\n", hdr->ctdb_version);
608                 return;
609         }
610
611         DEBUG(DEBUG_DEBUG,(__location__ " client request %u of type %u length %u from "
612                  "node %u to %u\n", hdr->reqid, hdr->operation, hdr->length,
613                  hdr->srcnode, hdr->destnode));
614
615         /* it is the responsibility of the incoming packet function to free 'data' */
616         daemon_incoming_packet(client, hdr);
617 }
618
619
620 static int ctdb_clientpid_destructor(struct ctdb_client_pid_list *client_pid)
621 {
622         if (client_pid->ctdb->client_pids != NULL) {
623                 DLIST_REMOVE(client_pid->ctdb->client_pids, client_pid);
624         }
625
626         return 0;
627 }
628
629
630 static void ctdb_accept_client(struct event_context *ev, struct fd_event *fde, 
631                          uint16_t flags, void *private_data)
632 {
633         struct sockaddr_un addr;
634         socklen_t len;
635         int fd;
636         struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
637         struct ctdb_client *client;
638         struct ctdb_client_pid_list *client_pid;
639 #ifdef _AIX
640         struct peercred_struct cr;
641         socklen_t crl = sizeof(struct peercred_struct);
642 #else
643         struct ucred cr;
644         socklen_t crl = sizeof(struct ucred);
645 #endif
646
647         memset(&addr, 0, sizeof(addr));
648         len = sizeof(addr);
649         fd = accept(ctdb->daemon.sd, (struct sockaddr *)&addr, &len);
650         if (fd == -1) {
651                 return;
652         }
653
654         set_nonblocking(fd);
655         set_close_on_exec(fd);
656
657         DEBUG(DEBUG_DEBUG,(__location__ " Created SOCKET FD:%d to connected child\n", fd));
658
659         client = talloc_zero(ctdb, struct ctdb_client);
660 #ifdef _AIX
661         if (getsockopt(fd, SOL_SOCKET, SO_PEERID, &cr, &crl) == 0) {
662 #else
663         if (getsockopt(fd, SOL_SOCKET, SO_PEERCRED, &cr, &crl) == 0) {
664 #endif
665                 DEBUG(DEBUG_INFO,("Connected client with pid:%u\n", (unsigned)cr.pid));
666         }
667
668         client->ctdb = ctdb;
669         client->fd = fd;
670         client->client_id = ctdb_reqid_new(ctdb, client);
671         client->pid = cr.pid;
672
673         client_pid = talloc(client, struct ctdb_client_pid_list);
674         if (client_pid == NULL) {
675                 DEBUG(DEBUG_ERR,("Failed to allocate client pid structure\n"));
676                 close(fd);
677                 talloc_free(client);
678                 return;
679         }               
680         client_pid->ctdb   = ctdb;
681         client_pid->pid    = cr.pid;
682         client_pid->client = client;
683
684         DLIST_ADD(ctdb->client_pids, client_pid);
685
686         client->queue = ctdb_queue_setup(ctdb, client, fd, CTDB_DS_ALIGNMENT, 
687                                          ctdb_daemon_read_cb, client,
688                                          "client-%u", client->pid);
689
690         talloc_set_destructor(client, ctdb_client_destructor);
691         talloc_set_destructor(client_pid, ctdb_clientpid_destructor);
692         CTDB_INCREMENT_STAT(ctdb, num_clients);
693 }
694
695
696
697 /*
698   create a unix domain socket and bind it
699   return a file descriptor open on the socket 
700 */
701 static int ux_socket_bind(struct ctdb_context *ctdb)
702 {
703         struct sockaddr_un addr;
704
705         ctdb->daemon.sd = socket(AF_UNIX, SOCK_STREAM, 0);
706         if (ctdb->daemon.sd == -1) {
707                 return -1;
708         }
709
710         set_close_on_exec(ctdb->daemon.sd);
711         set_nonblocking(ctdb->daemon.sd);
712
713         memset(&addr, 0, sizeof(addr));
714         addr.sun_family = AF_UNIX;
715         strncpy(addr.sun_path, ctdb->daemon.name, sizeof(addr.sun_path));
716
717         if (bind(ctdb->daemon.sd, (struct sockaddr *)&addr, sizeof(addr)) == -1) {
718                 DEBUG(DEBUG_CRIT,("Unable to bind on ctdb socket '%s'\n", ctdb->daemon.name));
719                 goto failed;
720         }       
721
722         if (chown(ctdb->daemon.name, geteuid(), getegid()) != 0 ||
723             chmod(ctdb->daemon.name, 0700) != 0) {
724                 DEBUG(DEBUG_CRIT,("Unable to secure ctdb socket '%s', ctdb->daemon.name\n", ctdb->daemon.name));
725                 goto failed;
726         } 
727
728
729         if (listen(ctdb->daemon.sd, 100) != 0) {
730                 DEBUG(DEBUG_CRIT,("Unable to listen on ctdb socket '%s'\n", ctdb->daemon.name));
731                 goto failed;
732         }
733
734         return 0;
735
736 failed:
737         close(ctdb->daemon.sd);
738         ctdb->daemon.sd = -1;
739         return -1;      
740 }
741
742 static void sig_child_handler(struct event_context *ev,
743         struct signal_event *se, int signum, int count,
744         void *dont_care, 
745         void *private_data)
746 {
747 //      struct ctdb_context *ctdb = talloc_get_type(private_data, struct ctdb_context);
748         int status;
749         pid_t pid = -1;
750
751         while (pid != 0) {
752                 pid = waitpid(-1, &status, WNOHANG);
753                 if (pid == -1) {
754                         DEBUG(DEBUG_ERR, (__location__ " waitpid() returned error. errno:%d\n", errno));
755                         return;
756                 }
757                 if (pid > 0) {
758                         DEBUG(DEBUG_DEBUG, ("SIGCHLD from %d\n", (int)pid));
759                 }
760         }
761 }
762
763 static void ctdb_setup_event_callback(struct ctdb_context *ctdb, int status,
764                                       void *private_data)
765 {
766         if (status != 0) {
767                 ctdb_fatal(ctdb, "Failed to run setup event\n");
768                 return;
769         }
770         ctdb_run_notification_script(ctdb, "setup");
771
772         /* tell all other nodes we've just started up */
773         ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL,
774                                  0, CTDB_CONTROL_STARTUP, 0,
775                                  CTDB_CTRL_FLAG_NOREPLY,
776                                  tdb_null, NULL, NULL);
777 }
778
779 /*
780   start the protocol going as a daemon
781 */
782 int ctdb_start_daemon(struct ctdb_context *ctdb, bool do_fork, bool use_syslog, const char *public_address_list)
783 {
784         int res, ret = -1;
785         struct fd_event *fde;
786         const char *domain_socket_name;
787         struct signal_event *se;
788
789         /* get rid of any old sockets */
790         unlink(ctdb->daemon.name);
791
792         /* create a unix domain stream socket to listen to */
793         res = ux_socket_bind(ctdb);
794         if (res!=0) {
795                 DEBUG(DEBUG_ALERT,(__location__ " Failed to open CTDB unix domain socket\n"));
796                 exit(10);
797         }
798
799         if (do_fork && fork()) {
800                 return 0;
801         }
802
803         tdb_reopen_all(False);
804
805         if (do_fork) {
806                 setsid();
807                 close(0);
808                 if (open("/dev/null", O_RDONLY) != 0) {
809                         DEBUG(DEBUG_ALERT,(__location__ " Failed to setup stdin on /dev/null\n"));
810                         exit(11);
811                 }
812         }
813         block_signal(SIGPIPE);
814
815         ctdb->ctdbd_pid = getpid();
816
817
818         DEBUG(DEBUG_ERR, ("Starting CTDBD as pid : %u\n", ctdb->ctdbd_pid));
819
820         if (ctdb->do_setsched) {
821                 /* try to set us up as realtime */
822                 ctdb_set_scheduler(ctdb);
823         }
824
825         /* ensure the socket is deleted on exit of the daemon */
826         domain_socket_name = talloc_strdup(talloc_autofree_context(), ctdb->daemon.name);
827         if (domain_socket_name == NULL) {
828                 DEBUG(DEBUG_ALERT,(__location__ " talloc_strdup failed.\n"));
829                 exit(12);
830         }
831
832         ctdb->ev = event_context_init(NULL);
833         tevent_loop_allow_nesting(ctdb->ev);
834         ret = ctdb_init_tevent_logging(ctdb);
835         if (ret != 0) {
836                 DEBUG(DEBUG_ALERT,("Failed to initialize TEVENT logging\n"));
837                 exit(1);
838         }
839
840         ctdb_set_child_logging(ctdb);
841
842         /* initialize statistics collection */
843         ctdb_statistics_init(ctdb);
844
845         /* force initial recovery for election */
846         ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
847
848         if (strcmp(ctdb->transport, "tcp") == 0) {
849                 int ctdb_tcp_init(struct ctdb_context *);
850                 ret = ctdb_tcp_init(ctdb);
851         }
852 #ifdef USE_INFINIBAND
853         if (strcmp(ctdb->transport, "ib") == 0) {
854                 int ctdb_ibw_init(struct ctdb_context *);
855                 ret = ctdb_ibw_init(ctdb);
856         }
857 #endif
858         if (ret != 0) {
859                 DEBUG(DEBUG_ERR,("Failed to initialise transport '%s'\n", ctdb->transport));
860                 return -1;
861         }
862
863         if (ctdb->methods == NULL) {
864                 DEBUG(DEBUG_ALERT,(__location__ " Can not initialize transport. ctdb->methods is NULL\n"));
865                 ctdb_fatal(ctdb, "transport is unavailable. can not initialize.");
866         }
867
868         /* initialise the transport  */
869         if (ctdb->methods->initialise(ctdb) != 0) {
870                 ctdb_fatal(ctdb, "transport failed to initialise");
871         }
872         if (public_address_list) {
873                 ret = ctdb_set_public_addresses(ctdb, public_address_list);
874                 if (ret == -1) {
875                         DEBUG(DEBUG_ALERT,("Unable to setup public address list\n"));
876                         exit(1);
877                 }
878         }
879
880
881         /* attach to existing databases */
882         if (ctdb_attach_databases(ctdb) != 0) {
883                 ctdb_fatal(ctdb, "Failed to attach to databases\n");
884         }
885
886         ret = ctdb_event_script(ctdb, CTDB_EVENT_INIT);
887         if (ret != 0) {
888                 ctdb_fatal(ctdb, "Failed to run init event\n");
889         }
890         ctdb_run_notification_script(ctdb, "init");
891
892         /* start frozen, then let the first election sort things out */
893         if (ctdb_blocking_freeze(ctdb)) {
894                 ctdb_fatal(ctdb, "Failed to get initial freeze\n");
895         }
896
897         /* now start accepting clients, only can do this once frozen */
898         fde = event_add_fd(ctdb->ev, ctdb, ctdb->daemon.sd, 
899                            EVENT_FD_READ,
900                            ctdb_accept_client, ctdb);
901         tevent_fd_set_auto_close(fde);
902
903         /* release any IPs we hold from previous runs of the daemon */
904         if (ctdb->tunable.disable_ip_failover == 0) {
905                 ctdb_release_all_ips(ctdb);
906         }
907
908         /* start the transport going */
909         ctdb_start_transport(ctdb);
910
911         /* set up a handler to pick up sigchld */
912         se = event_add_signal(ctdb->ev, ctdb,
913                                      SIGCHLD, 0,
914                                      sig_child_handler,
915                                      ctdb);
916         if (se == NULL) {
917                 DEBUG(DEBUG_CRIT,("Failed to set up signal handler for SIGCHLD\n"));
918                 exit(1);
919         }
920
921         ret = ctdb_event_script_callback(ctdb,
922                                          ctdb,
923                                          ctdb_setup_event_callback,
924                                          ctdb,
925                                          false,
926                                          CTDB_EVENT_SETUP,
927                                          "");
928         if (ret != 0) {
929                 DEBUG(DEBUG_CRIT,("Failed to set up 'setup' event\n"));
930                 exit(1);
931         }
932
933         if (use_syslog) {
934                 if (start_syslog_daemon(ctdb)) {
935                         DEBUG(DEBUG_CRIT, ("Failed to start syslog daemon\n"));
936                         exit(10);
937                 }
938         }
939
940         ctdb_lockdown_memory(ctdb);
941           
942         /* go into a wait loop to allow other nodes to complete */
943         event_loop_wait(ctdb->ev);
944
945         DEBUG(DEBUG_CRIT,("event_loop_wait() returned. this should not happen\n"));
946         exit(1);
947 }
948
949 /*
950   allocate a packet for use in daemon<->daemon communication
951  */
952 struct ctdb_req_header *_ctdb_transport_allocate(struct ctdb_context *ctdb,
953                                                  TALLOC_CTX *mem_ctx, 
954                                                  enum ctdb_operation operation, 
955                                                  size_t length, size_t slength,
956                                                  const char *type)
957 {
958         int size;
959         struct ctdb_req_header *hdr;
960
961         length = MAX(length, slength);
962         size = (length+(CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
963
964         if (ctdb->methods == NULL) {
965                 DEBUG(DEBUG_INFO,(__location__ " Unable to allocate transport packet for operation %u of length %u. Transport is DOWN.\n",
966                          operation, (unsigned)length));
967                 return NULL;
968         }
969
970         hdr = (struct ctdb_req_header *)ctdb->methods->allocate_pkt(mem_ctx, size);
971         if (hdr == NULL) {
972                 DEBUG(DEBUG_ERR,("Unable to allocate transport packet for operation %u of length %u\n",
973                          operation, (unsigned)length));
974                 return NULL;
975         }
976         talloc_set_name_const(hdr, type);
977         memset(hdr, 0, slength);
978         hdr->length       = length;
979         hdr->operation    = operation;
980         hdr->ctdb_magic   = CTDB_MAGIC;
981         hdr->ctdb_version = CTDB_VERSION;
982         hdr->generation   = ctdb->vnn_map->generation;
983         hdr->srcnode      = ctdb->pnn;
984
985         return hdr;     
986 }
987
988 struct daemon_control_state {
989         struct daemon_control_state *next, *prev;
990         struct ctdb_client *client;
991         struct ctdb_req_control *c;
992         uint32_t reqid;
993         struct ctdb_node *node;
994 };
995
996 /*
997   callback when a control reply comes in
998  */
999 static void daemon_control_callback(struct ctdb_context *ctdb,
1000                                     int32_t status, TDB_DATA data, 
1001                                     const char *errormsg,
1002                                     void *private_data)
1003 {
1004         struct daemon_control_state *state = talloc_get_type(private_data, 
1005                                                              struct daemon_control_state);
1006         struct ctdb_client *client = state->client;
1007         struct ctdb_reply_control *r;
1008         size_t len;
1009         int ret;
1010
1011         /* construct a message to send to the client containing the data */
1012         len = offsetof(struct ctdb_reply_control, data) + data.dsize;
1013         if (errormsg) {
1014                 len += strlen(errormsg);
1015         }
1016         r = ctdbd_allocate_pkt(ctdb, state, CTDB_REPLY_CONTROL, len, 
1017                                struct ctdb_reply_control);
1018         CTDB_NO_MEMORY_VOID(ctdb, r);
1019
1020         r->hdr.reqid     = state->reqid;
1021         r->status        = status;
1022         r->datalen       = data.dsize;
1023         r->errorlen = 0;
1024         memcpy(&r->data[0], data.dptr, data.dsize);
1025         if (errormsg) {
1026                 r->errorlen = strlen(errormsg);
1027                 memcpy(&r->data[r->datalen], errormsg, r->errorlen);
1028         }
1029
1030         ret = daemon_queue_send(client, &r->hdr);
1031         if (ret != -1) {
1032                 talloc_free(state);
1033         }
1034 }
1035
1036 /*
1037   fail all pending controls to a disconnected node
1038  */
1039 void ctdb_daemon_cancel_controls(struct ctdb_context *ctdb, struct ctdb_node *node)
1040 {
1041         struct daemon_control_state *state;
1042         while ((state = node->pending_controls)) {
1043                 DLIST_REMOVE(node->pending_controls, state);
1044                 daemon_control_callback(ctdb, (uint32_t)-1, tdb_null, 
1045                                         "node is disconnected", state);
1046         }
1047 }
1048
1049 /*
1050   destroy a daemon_control_state
1051  */
1052 static int daemon_control_destructor(struct daemon_control_state *state)
1053 {
1054         if (state->node) {
1055                 DLIST_REMOVE(state->node->pending_controls, state);
1056         }
1057         return 0;
1058 }
1059
1060 /*
1061   this is called when the ctdb daemon received a ctdb request control
1062   from a local client over the unix domain socket
1063  */
1064 static void daemon_request_control_from_client(struct ctdb_client *client, 
1065                                                struct ctdb_req_control *c)
1066 {
1067         TDB_DATA data;
1068         int res;
1069         struct daemon_control_state *state;
1070         TALLOC_CTX *tmp_ctx = talloc_new(client);
1071
1072         if (c->hdr.destnode == CTDB_CURRENT_NODE) {
1073                 c->hdr.destnode = client->ctdb->pnn;
1074         }
1075
1076         state = talloc(client, struct daemon_control_state);
1077         CTDB_NO_MEMORY_VOID(client->ctdb, state);
1078
1079         state->client = client;
1080         state->c = talloc_steal(state, c);
1081         state->reqid = c->hdr.reqid;
1082         if (ctdb_validate_pnn(client->ctdb, c->hdr.destnode)) {
1083                 state->node = client->ctdb->nodes[c->hdr.destnode];
1084                 DLIST_ADD(state->node->pending_controls, state);
1085         } else {
1086                 state->node = NULL;
1087         }
1088
1089         talloc_set_destructor(state, daemon_control_destructor);
1090
1091         if (c->flags & CTDB_CTRL_FLAG_NOREPLY) {
1092                 talloc_steal(tmp_ctx, state);
1093         }
1094         
1095         data.dptr = &c->data[0];
1096         data.dsize = c->datalen;
1097         res = ctdb_daemon_send_control(client->ctdb, c->hdr.destnode,
1098                                        c->srvid, c->opcode, client->client_id,
1099                                        c->flags,
1100                                        data, daemon_control_callback,
1101                                        state);
1102         if (res != 0) {
1103                 DEBUG(DEBUG_ERR,(__location__ " Failed to send control to remote node %u\n",
1104                          c->hdr.destnode));
1105         }
1106
1107         talloc_free(tmp_ctx);
1108 }
1109
1110 /*
1111   register a call function
1112 */
1113 int ctdb_daemon_set_call(struct ctdb_context *ctdb, uint32_t db_id,
1114                          ctdb_fn_t fn, int id)
1115 {
1116         struct ctdb_registered_call *call;
1117         struct ctdb_db_context *ctdb_db;
1118
1119         ctdb_db = find_ctdb_db(ctdb, db_id);
1120         if (ctdb_db == NULL) {
1121                 return -1;
1122         }
1123
1124         call = talloc(ctdb_db, struct ctdb_registered_call);
1125         call->fn = fn;
1126         call->id = id;
1127
1128         DLIST_ADD(ctdb_db->calls, call);        
1129         return 0;
1130 }
1131
1132
1133
1134 /*
1135   this local messaging handler is ugly, but is needed to prevent
1136   recursion in ctdb_send_message() when the destination node is the
1137   same as the source node
1138  */
1139 struct ctdb_local_message {
1140         struct ctdb_context *ctdb;
1141         uint64_t srvid;
1142         TDB_DATA data;
1143 };
1144
1145 static void ctdb_local_message_trigger(struct event_context *ev, struct timed_event *te, 
1146                                        struct timeval t, void *private_data)
1147 {
1148         struct ctdb_local_message *m = talloc_get_type(private_data, 
1149                                                        struct ctdb_local_message);
1150         int res;
1151
1152         res = ctdb_dispatch_message(m->ctdb, m->srvid, m->data);
1153         if (res != 0) {
1154                 DEBUG(DEBUG_ERR, (__location__ " Failed to dispatch message for srvid=%llu\n", 
1155                           (unsigned long long)m->srvid));
1156         }
1157         talloc_free(m);
1158 }
1159
1160 static int ctdb_local_message(struct ctdb_context *ctdb, uint64_t srvid, TDB_DATA data)
1161 {
1162         struct ctdb_local_message *m;
1163         m = talloc(ctdb, struct ctdb_local_message);
1164         CTDB_NO_MEMORY(ctdb, m);
1165
1166         m->ctdb = ctdb;
1167         m->srvid = srvid;
1168         m->data  = data;
1169         m->data.dptr = talloc_memdup(m, m->data.dptr, m->data.dsize);
1170         if (m->data.dptr == NULL) {
1171                 talloc_free(m);
1172                 return -1;
1173         }
1174
1175         /* this needs to be done as an event to prevent recursion */
1176         event_add_timed(ctdb->ev, m, timeval_zero(), ctdb_local_message_trigger, m);
1177         return 0;
1178 }
1179
1180 /*
1181   send a ctdb message
1182 */
1183 int ctdb_daemon_send_message(struct ctdb_context *ctdb, uint32_t pnn,
1184                              uint64_t srvid, TDB_DATA data)
1185 {
1186         struct ctdb_req_message *r;
1187         int len;
1188
1189         if (ctdb->methods == NULL) {
1190                 DEBUG(DEBUG_INFO,(__location__ " Failed to send message. Transport is DOWN\n"));
1191                 return -1;
1192         }
1193
1194         /* see if this is a message to ourselves */
1195         if (pnn == ctdb->pnn) {
1196                 return ctdb_local_message(ctdb, srvid, data);
1197         }
1198
1199         len = offsetof(struct ctdb_req_message, data) + data.dsize;
1200         r = ctdb_transport_allocate(ctdb, ctdb, CTDB_REQ_MESSAGE, len,
1201                                     struct ctdb_req_message);
1202         CTDB_NO_MEMORY(ctdb, r);
1203
1204         r->hdr.destnode  = pnn;
1205         r->srvid         = srvid;
1206         r->datalen       = data.dsize;
1207         memcpy(&r->data[0], data.dptr, data.dsize);
1208
1209         ctdb_queue_packet(ctdb, &r->hdr);
1210
1211         talloc_free(r);
1212         return 0;
1213 }
1214
1215
1216
1217 struct ctdb_client_notify_list {
1218         struct ctdb_client_notify_list *next, *prev;
1219         struct ctdb_context *ctdb;
1220         uint64_t srvid;
1221         TDB_DATA data;
1222 };
1223
1224
1225 static int ctdb_client_notify_destructor(struct ctdb_client_notify_list *nl)
1226 {
1227         int ret;
1228
1229         DEBUG(DEBUG_ERR,("Sending client notify message for srvid:%llu\n", (unsigned long long)nl->srvid));
1230
1231         ret = ctdb_daemon_send_message(nl->ctdb, CTDB_BROADCAST_CONNECTED, (unsigned long long)nl->srvid, nl->data);
1232         if (ret != 0) {
1233                 DEBUG(DEBUG_ERR,("Failed to send client notify message\n"));
1234         }
1235
1236         return 0;
1237 }
1238
1239 int32_t ctdb_control_register_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1240 {
1241         struct ctdb_client_notify_register *notify = (struct ctdb_client_notify_register *)indata.dptr;
1242         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1243         struct ctdb_client_notify_list *nl;
1244
1245         DEBUG(DEBUG_INFO,("Register srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1246
1247         if (indata.dsize < offsetof(struct ctdb_client_notify_register, notify_data)) {
1248                 DEBUG(DEBUG_ERR,(__location__ " Too little data in control : %d\n", (int)indata.dsize));
1249                 return -1;
1250         }
1251
1252         if (indata.dsize != (notify->len + offsetof(struct ctdb_client_notify_register, notify_data))) {
1253                 DEBUG(DEBUG_ERR,(__location__ " Wrong amount of data in control. Got %d, expected %d\n", (int)indata.dsize, (int)(notify->len + offsetof(struct ctdb_client_notify_register, notify_data))));
1254                 return -1;
1255         }
1256
1257
1258         if (client == NULL) {
1259                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1260                 return -1;
1261         }
1262
1263         for(nl=client->notify; nl; nl=nl->next) {
1264                 if (nl->srvid == notify->srvid) {
1265                         break;
1266                 }
1267         }
1268         if (nl != NULL) {
1269                 DEBUG(DEBUG_ERR,(__location__ " Notification for srvid:%llu already exists for this client\n", (unsigned long long)notify->srvid));
1270                 return -1;
1271         }
1272
1273         nl = talloc(client, struct ctdb_client_notify_list);
1274         CTDB_NO_MEMORY(ctdb, nl);
1275         nl->ctdb       = ctdb;
1276         nl->srvid      = notify->srvid;
1277         nl->data.dsize = notify->len;
1278         nl->data.dptr  = talloc_size(nl, nl->data.dsize);
1279         CTDB_NO_MEMORY(ctdb, nl->data.dptr);
1280         memcpy(nl->data.dptr, notify->notify_data, nl->data.dsize);
1281         
1282         DLIST_ADD(client->notify, nl);
1283         talloc_set_destructor(nl, ctdb_client_notify_destructor);
1284
1285         return 0;
1286 }
1287
1288 int32_t ctdb_control_deregister_notify(struct ctdb_context *ctdb, uint32_t client_id, TDB_DATA indata)
1289 {
1290         struct ctdb_client_notify_deregister *notify = (struct ctdb_client_notify_deregister *)indata.dptr;
1291         struct ctdb_client *client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client); 
1292         struct ctdb_client_notify_list *nl;
1293
1294         DEBUG(DEBUG_INFO,("Deregister srvid %llu for client %d\n", (unsigned long long)notify->srvid, client_id));
1295
1296         if (client == NULL) {
1297                 DEBUG(DEBUG_ERR,(__location__ " Could not find client parent structure. You can not send this control to a remote node\n"));
1298                 return -1;
1299         }
1300
1301         for(nl=client->notify; nl; nl=nl->next) {
1302                 if (nl->srvid == notify->srvid) {
1303                         break;
1304                 }
1305         }
1306         if (nl == NULL) {
1307                 DEBUG(DEBUG_ERR,(__location__ " No notification for srvid:%llu found for this client\n", (unsigned long long)notify->srvid));
1308                 return -1;
1309         }
1310
1311         DLIST_REMOVE(client->notify, nl);
1312         talloc_set_destructor(nl, NULL);
1313         talloc_free(nl);
1314
1315         return 0;
1316 }
1317
1318 struct ctdb_client *ctdb_find_client_by_pid(struct ctdb_context *ctdb, pid_t pid)
1319 {
1320         struct ctdb_client_pid_list *client_pid;
1321
1322         for (client_pid = ctdb->client_pids; client_pid; client_pid=client_pid->next) {
1323                 if (client_pid->pid == pid) {
1324                         return client_pid->client;
1325                 }
1326         }
1327         return NULL;
1328 }
1329
1330
1331 /* This control is used by samba when probing if a process (of a samba daemon)
1332    exists on the node.
1333    Samba does this when it needs/wants to check if a subrecord in one of the
1334    databases is still valied, or if it is stale and can be removed.
1335    If the node is in unhealthy or stopped state we just kill of the samba
1336    process holding htis sub-record and return to the calling samba that
1337    the process does not exist.
1338    This allows us to forcefully recall subrecords registered by samba processes
1339    on banned and stopped nodes.
1340 */
1341 int32_t ctdb_control_process_exists(struct ctdb_context *ctdb, pid_t pid)
1342 {
1343         struct ctdb_client *client;
1344
1345         if (ctdb->nodes[ctdb->pnn]->flags & (NODE_FLAGS_BANNED|NODE_FLAGS_STOPPED)) {
1346                 client = ctdb_find_client_by_pid(ctdb, pid);
1347                 if (client != NULL) {
1348                         DEBUG(DEBUG_NOTICE,(__location__ " Killing client with pid:%d on banned/stopped node\n", (int)pid));
1349                         talloc_free(client);
1350                 }
1351                 return -1;
1352         }
1353
1354         return kill(pid, 0);
1355 }