messages_dgm: Use saved errno value
[samba.git] / source3 / lib / messages_dgm.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Samba internal messaging functions
4  * Copyright (C) 2013 by Volker Lendecke
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/network.h"
22 #include "system/filesys.h"
23 #include "system/dir.h"
24 #include "system/select.h"
25 #include "lib/util/debug.h"
26 #include "lib/messages_dgm.h"
27 #include "lib/util/genrand.h"
28 #include "lib/util/dlinklist.h"
29 #include "lib/pthreadpool/pthreadpool_tevent.h"
30 #include "lib/util/msghdr.h"
31 #include "lib/util/iov_buf.h"
32 #include "lib/util/blocking.h"
33 #include "lib/util/tevent_unix.h"
34
35 #define MESSAGING_DGM_FRAGMENT_LENGTH 1024
36
37 struct sun_path_buf {
38         /*
39          * This will carry enough for a socket path
40          */
41         char buf[sizeof(struct sockaddr_un)];
42 };
43
44 /*
45  * We can only have one tevent_fd per dgm_context and per
46  * tevent_context. Maintain a list of registered tevent_contexts per
47  * dgm_context.
48  */
49 struct messaging_dgm_fde_ev {
50         struct messaging_dgm_fde_ev *prev, *next;
51
52         /*
53          * Backreference to enable DLIST_REMOVE from our
54          * destructor. Also, set to NULL when the dgm_context dies
55          * before the messaging_dgm_fde_ev.
56          */
57         struct messaging_dgm_context *ctx;
58
59         struct tevent_context *ev;
60         struct tevent_fd *fde;
61 };
62
63 struct messaging_dgm_out {
64         struct messaging_dgm_out *prev, *next;
65         struct messaging_dgm_context *ctx;
66
67         pid_t pid;
68         int sock;
69         bool is_blocking;
70         uint64_t cookie;
71
72         struct tevent_queue *queue;
73         struct tevent_timer *idle_timer;
74 };
75
76 struct messaging_dgm_in_msg {
77         struct messaging_dgm_in_msg *prev, *next;
78         struct messaging_dgm_context *ctx;
79         size_t msglen;
80         size_t received;
81         pid_t sender_pid;
82         int sender_sock;
83         uint64_t cookie;
84         uint8_t buf[];
85 };
86
87 struct messaging_dgm_context {
88         struct tevent_context *ev;
89         pid_t pid;
90         struct sun_path_buf socket_dir;
91         struct sun_path_buf lockfile_dir;
92         int lockfile_fd;
93
94         int sock;
95         struct messaging_dgm_in_msg *in_msgs;
96
97         struct messaging_dgm_fde_ev *fde_evs;
98         void (*recv_cb)(struct tevent_context *ev,
99                         const uint8_t *msg,
100                         size_t msg_len,
101                         int *fds,
102                         size_t num_fds,
103                         void *private_data);
104         void *recv_cb_private_data;
105
106         bool *have_dgm_context;
107
108         struct pthreadpool_tevent *pool;
109         struct messaging_dgm_out *outsocks;
110 };
111
112 /* Set socket close on exec. */
113 static int prepare_socket_cloexec(int sock)
114 {
115 #ifdef FD_CLOEXEC
116         int flags;
117
118         flags = fcntl(sock, F_GETFD, 0);
119         if (flags == -1) {
120                 return errno;
121         }
122         flags |= FD_CLOEXEC;
123         if (fcntl(sock, F_SETFD, flags) == -1) {
124                 return errno;
125         }
126 #endif
127         return 0;
128 }
129
130 static void close_fd_array(int *fds, size_t num_fds)
131 {
132         size_t i;
133
134         for (i = 0; i < num_fds; i++) {
135                 if (fds[i] == -1) {
136                         continue;
137                 }
138
139                 close(fds[i]);
140                 fds[i] = -1;
141         }
142 }
143
144 /*
145  * The idle handler can free the struct messaging_dgm_out *,
146  * if it's unused (qlen of zero) which closes the socket.
147  */
148
149 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
150                                            struct tevent_timer *te,
151                                            struct timeval current_time,
152                                            void *private_data)
153 {
154         struct messaging_dgm_out *out = talloc_get_type_abort(
155                 private_data, struct messaging_dgm_out);
156         size_t qlen;
157
158         out->idle_timer = NULL;
159
160         qlen = tevent_queue_length(out->queue);
161         if (qlen == 0) {
162                 TALLOC_FREE(out);
163         }
164 }
165
166 /*
167  * Setup the idle handler to fire afer 1 second if the
168  * queue is zero.
169  */
170
171 static void messaging_dgm_out_rearm_idle_timer(struct messaging_dgm_out *out)
172 {
173         size_t qlen;
174
175         qlen = tevent_queue_length(out->queue);
176         if (qlen != 0) {
177                 TALLOC_FREE(out->idle_timer);
178                 return;
179         }
180
181         if (out->idle_timer != NULL) {
182                 tevent_update_timer(out->idle_timer,
183                                     tevent_timeval_current_ofs(1, 0));
184                 return;
185         }
186
187         out->idle_timer = tevent_add_timer(
188                 out->ctx->ev, out, tevent_timeval_current_ofs(1, 0),
189                 messaging_dgm_out_idle_handler, out);
190         /*
191          * No NULL check, we'll come back here. Worst case we're
192          * leaking a bit.
193          */
194 }
195
196 static int messaging_dgm_out_destructor(struct messaging_dgm_out *dst);
197 static void messaging_dgm_out_idle_handler(struct tevent_context *ev,
198                                            struct tevent_timer *te,
199                                            struct timeval current_time,
200                                            void *private_data);
201
202 /*
203  * Connect to an existing rendezvous point for another
204  * pid - wrapped inside a struct messaging_dgm_out *.
205  */
206
207 static int messaging_dgm_out_create(TALLOC_CTX *mem_ctx,
208                                     struct messaging_dgm_context *ctx,
209                                     pid_t pid, struct messaging_dgm_out **pout)
210 {
211         struct messaging_dgm_out *out;
212         struct sockaddr_un addr = { .sun_family = AF_UNIX };
213         int ret = ENOMEM;
214         int out_pathlen;
215         char addr_buf[sizeof(addr.sun_path) + (3 * sizeof(unsigned) + 2)];
216
217         out = talloc(mem_ctx, struct messaging_dgm_out);
218         if (out == NULL) {
219                 goto fail;
220         }
221
222         *out = (struct messaging_dgm_out) {
223                 .pid = pid,
224                 .ctx = ctx,
225                 .cookie = 1
226         };
227
228         out_pathlen = snprintf(addr_buf, sizeof(addr_buf),
229                                "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
230         if (out_pathlen < 0) {
231                 goto errno_fail;
232         }
233         if ((size_t)out_pathlen >= sizeof(addr.sun_path)) {
234                 ret = ENAMETOOLONG;
235                 goto fail;
236         }
237
238         memcpy(addr.sun_path, addr_buf, out_pathlen + 1);
239
240         out->queue = tevent_queue_create(out, addr.sun_path);
241         if (out->queue == NULL) {
242                 ret = ENOMEM;
243                 goto fail;
244         }
245
246         out->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
247         if (out->sock == -1) {
248                 goto errno_fail;
249         }
250
251         DLIST_ADD(ctx->outsocks, out);
252         talloc_set_destructor(out, messaging_dgm_out_destructor);
253
254         do {
255                 ret = connect(out->sock,
256                               (const struct sockaddr *)(const void *)&addr,
257                               sizeof(addr));
258         } while ((ret == -1) && (errno == EINTR));
259
260         if (ret == -1) {
261                 goto errno_fail;
262         }
263
264         ret = set_blocking(out->sock, false);
265         if (ret == -1) {
266                 goto errno_fail;
267         }
268         out->is_blocking = false;
269
270         *pout = out;
271         return 0;
272 errno_fail:
273         ret = errno;
274 fail:
275         TALLOC_FREE(out);
276         return ret;
277 }
278
279 static int messaging_dgm_out_destructor(struct messaging_dgm_out *out)
280 {
281         DLIST_REMOVE(out->ctx->outsocks, out);
282
283         if ((tevent_queue_length(out->queue) != 0) &&
284             (getpid() == out->ctx->pid)) {
285                 /*
286                  * We have pending jobs. We can't close the socket,
287                  * this has been handed over to messaging_dgm_out_queue_state.
288                  */
289                 return 0;
290         }
291
292         if (out->sock != -1) {
293                 close(out->sock);
294                 out->sock = -1;
295         }
296         return 0;
297 }
298
299 /*
300  * Find the struct messaging_dgm_out * to talk to pid.
301  * If we don't have one, create it. Set the timer to
302  * delete after 1 sec.
303  */
304
305 static int messaging_dgm_out_get(struct messaging_dgm_context *ctx, pid_t pid,
306                                  struct messaging_dgm_out **pout)
307 {
308         struct messaging_dgm_out *out;
309         int ret;
310
311         for (out = ctx->outsocks; out != NULL; out = out->next) {
312                 if (out->pid == pid) {
313                         break;
314                 }
315         }
316
317         if (out == NULL) {
318                 ret = messaging_dgm_out_create(ctx, ctx, pid, &out);
319                 if (ret != 0) {
320                         return ret;
321                 }
322         }
323
324         messaging_dgm_out_rearm_idle_timer(out);
325
326         *pout = out;
327         return 0;
328 }
329
330 /*
331  * This function is called directly to send a message fragment
332  * when the outgoing queue is zero, and from a pthreadpool
333  * job thread when messages are being queued (qlen != 0).
334  * Make sure *ONLY* thread-safe functions are called within.
335  */
336
337 static ssize_t messaging_dgm_sendmsg(int sock,
338                                      const struct iovec *iov, int iovlen,
339                                      const int *fds, size_t num_fds,
340                                      int *perrno)
341 {
342         struct msghdr msg;
343         ssize_t fdlen, ret;
344
345         /*
346          * Do the actual sendmsg syscall. This will be called from a
347          * pthreadpool helper thread, so be careful what you do here.
348          */
349
350         msg = (struct msghdr) {
351                 .msg_iov = discard_const_p(struct iovec, iov),
352                 .msg_iovlen = iovlen
353         };
354
355         fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
356         if (fdlen == -1) {
357                 *perrno = EINVAL;
358                 return -1;
359         }
360
361         {
362                 uint8_t buf[fdlen];
363
364                 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
365
366                 do {
367                         ret = sendmsg(sock, &msg, 0);
368                 } while ((ret == -1) && (errno == EINTR));
369         }
370
371         if (ret == -1) {
372                 *perrno = errno;
373         }
374         return ret;
375 }
376
377 struct messaging_dgm_out_queue_state {
378         struct tevent_context *ev;
379         struct pthreadpool_tevent *pool;
380
381         struct tevent_req *req;
382         struct tevent_req *subreq;
383
384         int sock;
385
386         int *fds;
387         uint8_t *buf;
388
389         ssize_t sent;
390         int err;
391 };
392
393 static int messaging_dgm_out_queue_state_destructor(
394         struct messaging_dgm_out_queue_state *state);
395 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
396                                            void *private_data);
397 static void messaging_dgm_out_threaded_job(void *private_data);
398 static void messaging_dgm_out_queue_done(struct tevent_req *subreq);
399
400 /*
401  * Push a message fragment onto a queue to be sent by a
402  * threadpool job. Makes copies of data/fd's to be sent.
403  * The running tevent_queue internally creates an immediate
404  * event to schedule the write.
405  */
406
407 static struct tevent_req *messaging_dgm_out_queue_send(
408         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
409         struct messaging_dgm_out *out,
410         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
411 {
412         struct tevent_req *req;
413         struct messaging_dgm_out_queue_state *state;
414         struct tevent_queue_entry *e;
415         size_t i;
416         ssize_t buflen;
417
418         req = tevent_req_create(out, &state,
419                                 struct messaging_dgm_out_queue_state);
420         if (req == NULL) {
421                 return NULL;
422         }
423         state->ev = ev;
424         state->pool = out->ctx->pool;
425         state->sock = out->sock;
426         state->req = req;
427
428         /*
429          * Go blocking in a thread
430          */
431         if (!out->is_blocking) {
432                 int ret = set_blocking(out->sock, true);
433                 if (ret == -1) {
434                         tevent_req_error(req, errno);
435                         return tevent_req_post(req, ev);
436                 }
437                 out->is_blocking = true;
438         }
439
440         buflen = iov_buflen(iov, iovlen);
441         if (buflen == -1) {
442                 tevent_req_error(req, EMSGSIZE);
443                 return tevent_req_post(req, ev);
444         }
445
446         state->buf = talloc_array(state, uint8_t, buflen);
447         if (tevent_req_nomem(state->buf, req)) {
448                 return tevent_req_post(req, ev);
449         }
450         iov_buf(iov, iovlen, state->buf, buflen);
451
452         state->fds = talloc_array(state, int, num_fds);
453         if (tevent_req_nomem(state->fds, req)) {
454                 return tevent_req_post(req, ev);
455         }
456
457         for (i=0; i<num_fds; i++) {
458                 state->fds[i] = -1;
459         }
460
461         for (i=0; i<num_fds; i++) {
462
463                 state->fds[i] = dup(fds[i]);
464
465                 if (state->fds[i] == -1) {
466                         int ret = errno;
467
468                         close_fd_array(state->fds, num_fds);
469
470                         tevent_req_error(req, ret);
471                         return tevent_req_post(req, ev);
472                 }
473         }
474
475         talloc_set_destructor(state, messaging_dgm_out_queue_state_destructor);
476
477         e = tevent_queue_add_entry(out->queue, ev, req,
478                                    messaging_dgm_out_queue_trigger, req);
479         if (tevent_req_nomem(e, req)) {
480                 return tevent_req_post(req, ev);
481         }
482         return req;
483 }
484
485 static int messaging_dgm_out_queue_state_destructor(
486         struct messaging_dgm_out_queue_state *state)
487 {
488         int *fds;
489         size_t num_fds;
490
491         if (state->subreq != NULL) {
492                 /*
493                  * We're scheduled, but we're destroyed. This happens
494                  * if the messaging_dgm_context is destroyed while
495                  * we're stuck in a blocking send. There's nothing we
496                  * can do but to leak memory.
497                  */
498                 TALLOC_FREE(state->subreq);
499                 (void)talloc_reparent(state->req, NULL, state);
500                 return -1;
501         }
502
503         fds = state->fds;
504         num_fds = talloc_array_length(fds);
505         close_fd_array(fds, num_fds);
506         return 0;
507 }
508
509 /*
510  * tevent_queue callback that schedules the pthreadpool to actually
511  * send the queued message fragment.
512  */
513
514 static void messaging_dgm_out_queue_trigger(struct tevent_req *req,
515                                            void *private_data)
516 {
517         struct messaging_dgm_out_queue_state *state = tevent_req_data(
518                 req, struct messaging_dgm_out_queue_state);
519
520         tevent_req_reset_endtime(req);
521
522         state->subreq = pthreadpool_tevent_job_send(
523                 state, state->ev, state->pool,
524                 messaging_dgm_out_threaded_job, state);
525         if (tevent_req_nomem(state->subreq, req)) {
526                 return;
527         }
528         tevent_req_set_callback(state->subreq, messaging_dgm_out_queue_done,
529                                 req);
530 }
531
532 /*
533  * Wrapper function run by the pthread that calls
534  * messaging_dgm_sendmsg() to actually do the sendmsg().
535  */
536
537 static void messaging_dgm_out_threaded_job(void *private_data)
538 {
539         struct messaging_dgm_out_queue_state *state = talloc_get_type_abort(
540                 private_data, struct messaging_dgm_out_queue_state);
541
542         struct iovec iov = { .iov_base = state->buf,
543                              .iov_len = talloc_get_size(state->buf) };
544         size_t num_fds = talloc_array_length(state->fds);
545         int msec = 1;
546
547         while (true) {
548                 int ret;
549
550                 state->sent = messaging_dgm_sendmsg(state->sock, &iov, 1,
551                                             state->fds, num_fds, &state->err);
552
553                 if (state->sent != -1) {
554                         return;
555                 }
556                 if (state->err != ENOBUFS) {
557                         return;
558                 }
559
560                 /*
561                  * ENOBUFS is the FreeBSD way of saying "Try
562                  * again". We have to do polling.
563                  */
564                 do {
565                         ret = poll(NULL, 0, msec);
566                 } while ((ret == -1) && (errno == EINTR));
567
568                 /*
569                  * Exponential backoff up to once a second
570                  */
571                 msec *= 2;
572                 msec = MIN(msec, 1000);
573         }
574 }
575
576 /*
577  * Pickup the results of the pthread sendmsg().
578  */
579
580 static void messaging_dgm_out_queue_done(struct tevent_req *subreq)
581 {
582         struct tevent_req *req = tevent_req_callback_data(
583                 subreq, struct tevent_req);
584         struct messaging_dgm_out_queue_state *state = tevent_req_data(
585                 req, struct messaging_dgm_out_queue_state);
586         int ret;
587
588         if (subreq != state->subreq) {
589                 abort();
590         }
591
592         ret = pthreadpool_tevent_job_recv(subreq);
593
594         TALLOC_FREE(subreq);
595         state->subreq = NULL;
596
597         if (tevent_req_error(req, ret)) {
598                 return;
599         }
600         if (state->sent == -1) {
601                 tevent_req_error(req, state->err);
602                 return;
603         }
604         tevent_req_done(req);
605 }
606
607 static int messaging_dgm_out_queue_recv(struct tevent_req *req)
608 {
609         return tevent_req_simple_recv_unix(req);
610 }
611
612 static void messaging_dgm_out_sent_fragment(struct tevent_req *req);
613
614 /*
615  * Core function to send a message fragment given a
616  * connected struct messaging_dgm_out * destination.
617  * If no current queue tries to send nonblocking
618  * directly. If not, queues the fragment (which makes
619  * a copy of it) and adds a 60-second timeout on the send.
620  */
621
622 static int messaging_dgm_out_send_fragment(
623         struct tevent_context *ev, struct messaging_dgm_out *out,
624         const struct iovec *iov, int iovlen, const int *fds, size_t num_fds)
625 {
626         struct tevent_req *req;
627         size_t qlen;
628         bool ok;
629
630         qlen = tevent_queue_length(out->queue);
631         if (qlen == 0) {
632                 ssize_t nsent;
633                 int err = 0;
634
635                 if (out->is_blocking) {
636                         int ret = set_blocking(out->sock, false);
637                         if (ret == -1) {
638                                 return errno;
639                         }
640                         out->is_blocking = false;
641                 }
642
643                 nsent = messaging_dgm_sendmsg(out->sock, iov, iovlen, fds,
644                                               num_fds, &err);
645                 if (nsent >= 0) {
646                         return 0;
647                 }
648
649                 if (err == ENOBUFS) {
650                         /*
651                          * FreeBSD's way of telling us the dst socket
652                          * is full. EWOULDBLOCK makes us spawn a
653                          * polling helper thread.
654                          */
655                         err = EWOULDBLOCK;
656                 }
657
658                 if (err != EWOULDBLOCK) {
659                         return err;
660                 }
661         }
662
663         req = messaging_dgm_out_queue_send(out, ev, out, iov, iovlen,
664                                            fds, num_fds);
665         if (req == NULL) {
666                 return ENOMEM;
667         }
668         tevent_req_set_callback(req, messaging_dgm_out_sent_fragment, out);
669
670         ok = tevent_req_set_endtime(req, ev,
671                                     tevent_timeval_current_ofs(60, 0));
672         if (!ok) {
673                 TALLOC_FREE(req);
674                 return ENOMEM;
675         }
676
677         return 0;
678 }
679
680 /*
681  * Pickup the result of the fragment send. Reset idle timer
682  * if queue empty.
683  */
684
685 static void messaging_dgm_out_sent_fragment(struct tevent_req *req)
686 {
687         struct messaging_dgm_out *out = tevent_req_callback_data(
688                 req, struct messaging_dgm_out);
689         int ret;
690
691         ret = messaging_dgm_out_queue_recv(req);
692         TALLOC_FREE(req);
693
694         if (ret != 0) {
695                 DBG_WARNING("messaging_out_queue_recv returned %s\n",
696                             strerror(ret));
697         }
698
699         messaging_dgm_out_rearm_idle_timer(out);
700 }
701
702
703 struct messaging_dgm_fragment_hdr {
704         size_t msglen;
705         pid_t pid;
706         int sock;
707 };
708
709 /*
710  * Fragment a message into MESSAGING_DGM_FRAGMENT_LENGTH - 64-bit cookie
711  * size chunks and send it.
712  *
713  * Message fragments are prefixed by a 64-bit cookie that
714  * stays the same for all fragments. This allows the receiver
715  * to recognise fragments of the same message and re-assemble
716  * them on the other end.
717  *
718  * Note that this allows other message fragments from other
719  * senders to be interleaved in the receive read processing,
720  * the combination of the cookie and header info allows unique
721  * identification of the message from a specific sender in
722  * re-assembly.
723  *
724  * If the message is smaller than MESSAGING_DGM_FRAGMENT_LENGTH - cookie
725  * then send a single message with cookie set to zero.
726  *
727  * Otherwise the message is fragmented into chunks and added
728  * to the sending queue. Any file descriptors are passed only
729  * in the last fragment.
730  *
731  * Finally the cookie is incremented (wrap over zero) to
732  * prepare for the next message sent to this channel.
733  *
734  */
735
736 static int messaging_dgm_out_send_fragmented(struct tevent_context *ev,
737                                              struct messaging_dgm_out *out,
738                                              const struct iovec *iov,
739                                              int iovlen,
740                                              const int *fds, size_t num_fds)
741 {
742         ssize_t msglen, sent;
743         int ret = 0;
744         struct iovec iov_copy[iovlen+2];
745         struct messaging_dgm_fragment_hdr hdr;
746         struct iovec src_iov;
747
748         if (iovlen < 0) {
749                 return EINVAL;
750         }
751
752         msglen = iov_buflen(iov, iovlen);
753         if (msglen == -1) {
754                 return EMSGSIZE;
755         }
756         if (num_fds > INT8_MAX) {
757                 return EINVAL;
758         }
759
760         if ((size_t) msglen <=
761             (MESSAGING_DGM_FRAGMENT_LENGTH - sizeof(uint64_t))) {
762                 uint64_t cookie = 0;
763
764                 iov_copy[0].iov_base = &cookie;
765                 iov_copy[0].iov_len = sizeof(cookie);
766                 if (iovlen > 0) {
767                         memcpy(&iov_copy[1], iov,
768                                sizeof(struct iovec) * iovlen);
769                 }
770
771                 return messaging_dgm_out_send_fragment(
772                         ev, out, iov_copy, iovlen+1, fds, num_fds);
773
774         }
775
776         hdr = (struct messaging_dgm_fragment_hdr) {
777                 .msglen = msglen,
778                 .pid = getpid(),
779                 .sock = out->sock
780         };
781
782         iov_copy[0].iov_base = &out->cookie;
783         iov_copy[0].iov_len = sizeof(out->cookie);
784         iov_copy[1].iov_base = &hdr;
785         iov_copy[1].iov_len = sizeof(hdr);
786
787         sent = 0;
788         src_iov = iov[0];
789
790         /*
791          * The following write loop sends the user message in pieces. We have
792          * filled the first two iovecs above with "cookie" and "hdr". In the
793          * following loops we pull message chunks from the user iov array and
794          * fill iov_copy piece by piece, possibly truncating chunks from the
795          * caller's iov array. Ugly, but hopefully efficient.
796          */
797
798         while (sent < msglen) {
799                 size_t fragment_len;
800                 size_t iov_index = 2;
801
802                 fragment_len = sizeof(out->cookie) + sizeof(hdr);
803
804                 while (fragment_len < MESSAGING_DGM_FRAGMENT_LENGTH) {
805                         size_t space, chunk;
806
807                         space = MESSAGING_DGM_FRAGMENT_LENGTH - fragment_len;
808                         chunk = MIN(space, src_iov.iov_len);
809
810                         iov_copy[iov_index].iov_base = src_iov.iov_base;
811                         iov_copy[iov_index].iov_len = chunk;
812                         iov_index += 1;
813
814                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
815                         src_iov.iov_len -= chunk;
816                         fragment_len += chunk;
817
818                         if (src_iov.iov_len == 0) {
819                                 iov += 1;
820                                 iovlen -= 1;
821                                 if (iovlen == 0) {
822                                         break;
823                                 }
824                                 src_iov = iov[0];
825                         }
826                 }
827                 sent += (fragment_len - sizeof(out->cookie) - sizeof(hdr));
828
829                 /*
830                  * only the last fragment should pass the fd array.
831                  * That simplifies the receiver a lot.
832                  */
833                 if (sent < msglen) {
834                         ret = messaging_dgm_out_send_fragment(
835                                 ev, out, iov_copy, iov_index, NULL, 0);
836                 } else {
837                         ret = messaging_dgm_out_send_fragment(
838                                 ev, out, iov_copy, iov_index, fds, num_fds);
839                 }
840                 if (ret != 0) {
841                         break;
842                 }
843         }
844
845         out->cookie += 1;
846         if (out->cookie == 0) {
847                 out->cookie += 1;
848         }
849
850         return ret;
851 }
852
853 static struct messaging_dgm_context *global_dgm_context;
854
855 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
856
857 static int messaging_dgm_lockfile_create(struct messaging_dgm_context *ctx,
858                                          pid_t pid, int *plockfile_fd,
859                                          uint64_t *punique)
860 {
861         char buf[64];
862         int lockfile_fd;
863         struct sun_path_buf lockfile_name;
864         struct flock lck;
865         uint64_t unique;
866         int unique_len, ret;
867         ssize_t written;
868
869         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
870                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
871         if (ret < 0) {
872                 return errno;
873         }
874         if ((unsigned)ret >= sizeof(lockfile_name.buf)) {
875                 return ENAMETOOLONG;
876         }
877
878         /* no O_EXCL, existence check is via the fcntl lock */
879
880         lockfile_fd = open(lockfile_name.buf, O_NONBLOCK|O_CREAT|O_RDWR,
881                            0644);
882
883         if ((lockfile_fd == -1) &&
884             ((errno == ENXIO) /* Linux */ ||
885              (errno == ENODEV) /* Linux kernel bug */ ||
886              (errno == EOPNOTSUPP) /* FreeBSD */)) {
887                 /*
888                  * Huh -- a socket? This might be a stale socket from
889                  * an upgrade of Samba. Just unlink and retry, nobody
890                  * else is supposed to be here at this time.
891                  *
892                  * Yes, this is racy, but I don't see a way to deal
893                  * with this properly.
894                  */
895                 unlink(lockfile_name.buf);
896
897                 lockfile_fd = open(lockfile_name.buf,
898                                    O_NONBLOCK|O_CREAT|O_WRONLY,
899                                    0644);
900         }
901
902         if (lockfile_fd == -1) {
903                 ret = errno;
904                 DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
905                 return ret;
906         }
907
908         lck = (struct flock) {
909                 .l_type = F_WRLCK,
910                 .l_whence = SEEK_SET
911         };
912
913         ret = fcntl(lockfile_fd, F_SETLK, &lck);
914         if (ret == -1) {
915                 ret = errno;
916                 DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
917                 goto fail_close;
918         }
919
920         /*
921          * Directly using the binary value for
922          * SERVERID_UNIQUE_ID_NOT_TO_VERIFY is a layering
923          * violation. But including all of ndr here just for this
924          * seems to be a bit overkill to me. Also, messages_dgm might
925          * be replaced sooner or later by something streams-based,
926          * where unique_id generation will be handled differently.
927          */
928
929         do {
930                 generate_random_buffer((uint8_t *)&unique, sizeof(unique));
931         } while (unique == UINT64_C(0xFFFFFFFFFFFFFFFF));
932
933         unique_len = snprintf(buf, sizeof(buf), "%ju\n", (uintmax_t)unique);
934
935         /* shorten a potentially preexisting file */
936
937         ret = ftruncate(lockfile_fd, unique_len);
938         if (ret == -1) {
939                 ret = errno;
940                 DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
941                           strerror(ret)));
942                 goto fail_unlink;
943         }
944
945         written = write(lockfile_fd, buf, unique_len);
946         if (written != unique_len) {
947                 ret = errno;
948                 DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
949                 goto fail_unlink;
950         }
951
952         *plockfile_fd = lockfile_fd;
953         *punique = unique;
954         return 0;
955
956 fail_unlink:
957         unlink(lockfile_name.buf);
958 fail_close:
959         close(lockfile_fd);
960         return ret;
961 }
962
963 static void messaging_dgm_read_handler(struct tevent_context *ev,
964                                        struct tevent_fd *fde,
965                                        uint16_t flags,
966                                        void *private_data);
967
968 /*
969  * Create the rendezvous point in the file system
970  * that other processes can use to send messages to
971  * this pid.
972  */
973
974 int messaging_dgm_init(struct tevent_context *ev,
975                        uint64_t *punique,
976                        const char *socket_dir,
977                        const char *lockfile_dir,
978                        void (*recv_cb)(struct tevent_context *ev,
979                                        const uint8_t *msg,
980                                        size_t msg_len,
981                                        int *fds,
982                                        size_t num_fds,
983                                        void *private_data),
984                        void *recv_cb_private_data)
985 {
986         struct messaging_dgm_context *ctx;
987         int ret;
988         struct sockaddr_un socket_address;
989         size_t len;
990         static bool have_dgm_context = false;
991
992         if (have_dgm_context) {
993                 return EEXIST;
994         }
995
996         ctx = talloc_zero(NULL, struct messaging_dgm_context);
997         if (ctx == NULL) {
998                 goto fail_nomem;
999         }
1000         ctx->ev = ev;
1001         ctx->pid = getpid();
1002         ctx->recv_cb = recv_cb;
1003         ctx->recv_cb_private_data = recv_cb_private_data;
1004
1005         len = strlcpy(ctx->lockfile_dir.buf, lockfile_dir,
1006                       sizeof(ctx->lockfile_dir.buf));
1007         if (len >= sizeof(ctx->lockfile_dir.buf)) {
1008                 TALLOC_FREE(ctx);
1009                 return ENAMETOOLONG;
1010         }
1011
1012         len = strlcpy(ctx->socket_dir.buf, socket_dir,
1013                       sizeof(ctx->socket_dir.buf));
1014         if (len >= sizeof(ctx->socket_dir.buf)) {
1015                 TALLOC_FREE(ctx);
1016                 return ENAMETOOLONG;
1017         }
1018
1019         socket_address = (struct sockaddr_un) { .sun_family = AF_UNIX };
1020         len = snprintf(socket_address.sun_path,
1021                        sizeof(socket_address.sun_path),
1022                        "%s/%u", socket_dir, (unsigned)ctx->pid);
1023         if (len >= sizeof(socket_address.sun_path)) {
1024                 TALLOC_FREE(ctx);
1025                 return ENAMETOOLONG;
1026         }
1027
1028         ret = messaging_dgm_lockfile_create(ctx, ctx->pid, &ctx->lockfile_fd,
1029                                             punique);
1030         if (ret != 0) {
1031                 DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
1032                           __func__, strerror(ret)));
1033                 TALLOC_FREE(ctx);
1034                 return ret;
1035         }
1036
1037         unlink(socket_address.sun_path);
1038
1039         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
1040         if (ctx->sock == -1) {
1041                 ret = errno;
1042                 DBG_WARNING("socket failed: %s\n", strerror(ret));
1043                 TALLOC_FREE(ctx);
1044                 return ret;
1045         }
1046
1047         ret = prepare_socket_cloexec(ctx->sock);
1048         if (ret == -1) {
1049                 ret = errno;
1050                 DBG_WARNING("prepare_socket_cloexec failed: %s\n",
1051                             strerror(ret));
1052                 TALLOC_FREE(ctx);
1053                 return ret;
1054         }
1055
1056         ret = bind(ctx->sock, (struct sockaddr *)(void *)&socket_address,
1057                    sizeof(socket_address));
1058         if (ret == -1) {
1059                 ret = errno;
1060                 DBG_WARNING("bind failed: %s\n", strerror(ret));
1061                 TALLOC_FREE(ctx);
1062                 return ret;
1063         }
1064
1065         talloc_set_destructor(ctx, messaging_dgm_context_destructor);
1066
1067         ctx->have_dgm_context = &have_dgm_context;
1068
1069         ret = pthreadpool_tevent_init(ctx, UINT_MAX, &ctx->pool);
1070         if (ret != 0) {
1071                 DBG_WARNING("pthreadpool_tevent_init failed: %s\n",
1072                             strerror(ret));
1073                 TALLOC_FREE(ctx);
1074                 return ret;
1075         }
1076
1077         global_dgm_context = ctx;
1078         return 0;
1079
1080 fail_nomem:
1081         TALLOC_FREE(ctx);
1082         return ENOMEM;
1083 }
1084
1085 /*
1086  * Remove the rendezvous point in the filesystem
1087  * if we're the owner.
1088  */
1089
1090 static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
1091 {
1092         while (c->outsocks != NULL) {
1093                 TALLOC_FREE(c->outsocks);
1094         }
1095         while (c->in_msgs != NULL) {
1096                 TALLOC_FREE(c->in_msgs);
1097         }
1098         while (c->fde_evs != NULL) {
1099                 tevent_fd_set_flags(c->fde_evs->fde, 0);
1100                 c->fde_evs->ctx = NULL;
1101                 DLIST_REMOVE(c->fde_evs, c->fde_evs);
1102         }
1103
1104         close(c->sock);
1105
1106         if (getpid() == c->pid) {
1107                 struct sun_path_buf name;
1108                 int ret;
1109
1110                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1111                                c->socket_dir.buf, (unsigned)c->pid);
1112                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1113                         /*
1114                          * We've checked the length when creating, so this
1115                          * should never happen
1116                          */
1117                         abort();
1118                 }
1119                 unlink(name.buf);
1120
1121                 ret = snprintf(name.buf, sizeof(name.buf), "%s/%u",
1122                                c->lockfile_dir.buf, (unsigned)c->pid);
1123                 if ((ret < 0) || ((size_t)ret >= sizeof(name.buf))) {
1124                         /*
1125                          * We've checked the length when creating, so this
1126                          * should never happen
1127                          */
1128                         abort();
1129                 }
1130                 unlink(name.buf);
1131         }
1132         close(c->lockfile_fd);
1133
1134         if (c->have_dgm_context != NULL) {
1135                 *c->have_dgm_context = false;
1136         }
1137
1138         return 0;
1139 }
1140
1141 static void messaging_dgm_validate(struct messaging_dgm_context *ctx)
1142 {
1143 #ifdef DEVELOPER
1144         pid_t pid = getpid();
1145         struct sockaddr_storage addr;
1146         socklen_t addrlen = sizeof(addr);
1147         struct sockaddr_un *un_addr;
1148         struct sun_path_buf pathbuf;
1149         struct stat st1, st2;
1150         int ret;
1151
1152         /*
1153          * Protect against using the wrong messaging context after a
1154          * fork without reinit_after_fork.
1155          */
1156
1157         ret = getsockname(ctx->sock, (struct sockaddr *)&addr, &addrlen);
1158         if (ret == -1) {
1159                 DBG_ERR("getsockname failed: %s\n", strerror(errno));
1160                 goto fail;
1161         }
1162         if (addr.ss_family != AF_UNIX) {
1163                 DBG_ERR("getsockname returned family %d\n",
1164                         (int)addr.ss_family);
1165                 goto fail;
1166         }
1167         un_addr = (struct sockaddr_un *)&addr;
1168
1169         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1170                        "%s/%u", ctx->socket_dir.buf, (unsigned)pid);
1171         if (ret < 0) {
1172                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1173                 goto fail;
1174         }
1175         if ((size_t)ret >= sizeof(pathbuf.buf)) {
1176                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1177                 goto fail;
1178         }
1179
1180         if (strcmp(pathbuf.buf, un_addr->sun_path) != 0) {
1181                 DBG_ERR("sockname wrong: Expected %s, got %s\n",
1182                         pathbuf.buf, un_addr->sun_path);
1183                 goto fail;
1184         }
1185
1186         ret = snprintf(pathbuf.buf, sizeof(pathbuf.buf),
1187                        "%s/%u", ctx->lockfile_dir.buf, (unsigned)pid);
1188         if (ret < 0) {
1189                 DBG_ERR("snprintf failed: %s\n", strerror(errno));
1190                 goto fail;
1191         }
1192         if ((size_t)ret >= sizeof(pathbuf.buf)) {
1193                 DBG_ERR("snprintf returned %d chars\n", (int)ret);
1194                 goto fail;
1195         }
1196
1197         ret = stat(pathbuf.buf, &st1);
1198         if (ret == -1) {
1199                 DBG_ERR("stat failed: %s\n", strerror(errno));
1200                 goto fail;
1201         }
1202         ret = fstat(ctx->lockfile_fd, &st2);
1203         if (ret == -1) {
1204                 DBG_ERR("fstat failed: %s\n", strerror(errno));
1205                 goto fail;
1206         }
1207
1208         if ((st1.st_dev != st2.st_dev) || (st1.st_ino != st2.st_ino)) {
1209                 DBG_ERR("lockfile differs, expected (%d/%d), got (%d/%d)\n",
1210                         (int)st2.st_dev, (int)st2.st_ino,
1211                         (int)st1.st_dev, (int)st1.st_ino);
1212                 goto fail;
1213         }
1214
1215         return;
1216 fail:
1217         abort();
1218 #else
1219         return;
1220 #endif
1221 }
1222
1223 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1224                                struct tevent_context *ev,
1225                                uint8_t *msg, size_t msg_len,
1226                                int *fds, size_t num_fds);
1227
1228 /*
1229  * Raw read callback handler - passes to messaging_dgm_recv()
1230  * for fragment reassembly processing.
1231  */
1232
1233 static void messaging_dgm_read_handler(struct tevent_context *ev,
1234                                        struct tevent_fd *fde,
1235                                        uint16_t flags,
1236                                        void *private_data)
1237 {
1238         struct messaging_dgm_context *ctx = talloc_get_type_abort(
1239                 private_data, struct messaging_dgm_context);
1240         ssize_t received;
1241         struct msghdr msg;
1242         struct iovec iov;
1243         size_t msgbufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
1244         uint8_t msgbuf[msgbufsize];
1245         uint8_t buf[MESSAGING_DGM_FRAGMENT_LENGTH];
1246         size_t num_fds;
1247
1248         messaging_dgm_validate(ctx);
1249
1250         if ((flags & TEVENT_FD_READ) == 0) {
1251                 return;
1252         }
1253
1254         iov = (struct iovec) { .iov_base = buf, .iov_len = sizeof(buf) };
1255         msg = (struct msghdr) { .msg_iov = &iov, .msg_iovlen = 1 };
1256
1257         msghdr_prep_recv_fds(&msg, msgbuf, msgbufsize, INT8_MAX);
1258
1259 #ifdef MSG_CMSG_CLOEXEC
1260         msg.msg_flags |= MSG_CMSG_CLOEXEC;
1261 #endif
1262
1263         received = recvmsg(ctx->sock, &msg, 0);
1264         if (received == -1) {
1265                 if ((errno == EAGAIN) ||
1266                     (errno == EWOULDBLOCK) ||
1267                     (errno == EINTR) ||
1268                     (errno == ENOMEM)) {
1269                         /* Not really an error - just try again. */
1270                         return;
1271                 }
1272                 /* Problem with the socket. Set it unreadable. */
1273                 tevent_fd_set_flags(fde, 0);
1274                 return;
1275         }
1276
1277         if ((size_t)received > sizeof(buf)) {
1278                 /* More than we expected, not for us */
1279                 return;
1280         }
1281
1282         num_fds = msghdr_extract_fds(&msg, NULL, 0);
1283         if (num_fds == 0) {
1284                 int fds[1];
1285
1286                 messaging_dgm_recv(ctx, ev, buf, received, fds, 0);
1287         } else {
1288                 size_t i;
1289                 int fds[num_fds];
1290
1291                 msghdr_extract_fds(&msg, fds, num_fds);
1292
1293                 for (i = 0; i < num_fds; i++) {
1294                         int err;
1295
1296                         err = prepare_socket_cloexec(fds[i]);
1297                         if (err != 0) {
1298                                 close_fd_array(fds, num_fds);
1299                                 num_fds = 0;
1300                         }
1301                 }
1302
1303                 messaging_dgm_recv(ctx, ev, buf, received, fds, num_fds);
1304         }
1305 }
1306
1307 static int messaging_dgm_in_msg_destructor(struct messaging_dgm_in_msg *m)
1308 {
1309         DLIST_REMOVE(m->ctx->in_msgs, m);
1310         return 0;
1311 }
1312
1313 /*
1314  * Deal with identification of fragmented messages and
1315  * re-assembly into full messages sent, then calls the
1316  * callback.
1317  */
1318
1319 static void messaging_dgm_recv(struct messaging_dgm_context *ctx,
1320                                struct tevent_context *ev,
1321                                uint8_t *buf, size_t buflen,
1322                                int *fds, size_t num_fds)
1323 {
1324         struct messaging_dgm_fragment_hdr hdr;
1325         struct messaging_dgm_in_msg *msg;
1326         size_t space;
1327         uint64_t cookie;
1328
1329         if (buflen < sizeof(cookie)) {
1330                 goto close_fds;
1331         }
1332         memcpy(&cookie, buf, sizeof(cookie));
1333         buf += sizeof(cookie);
1334         buflen -= sizeof(cookie);
1335
1336         if (cookie == 0) {
1337                 ctx->recv_cb(ev, buf, buflen, fds, num_fds,
1338                              ctx->recv_cb_private_data);
1339                 return;
1340         }
1341
1342         if (buflen < sizeof(hdr)) {
1343                 goto close_fds;
1344         }
1345         memcpy(&hdr, buf, sizeof(hdr));
1346         buf += sizeof(hdr);
1347         buflen -= sizeof(hdr);
1348
1349         for (msg = ctx->in_msgs; msg != NULL; msg = msg->next) {
1350                 if ((msg->sender_pid == hdr.pid) &&
1351                     (msg->sender_sock == hdr.sock)) {
1352                         break;
1353                 }
1354         }
1355
1356         if ((msg != NULL) && (msg->cookie != cookie)) {
1357                 TALLOC_FREE(msg);
1358         }
1359
1360         if (msg == NULL) {
1361                 size_t msglen;
1362                 msglen = offsetof(struct messaging_dgm_in_msg, buf) +
1363                         hdr.msglen;
1364
1365                 msg = talloc_size(ctx, msglen);
1366                 if (msg == NULL) {
1367                         goto close_fds;
1368                 }
1369                 talloc_set_name_const(msg, "struct messaging_dgm_in_msg");
1370
1371                 *msg = (struct messaging_dgm_in_msg) {
1372                         .ctx = ctx, .msglen = hdr.msglen,
1373                         .sender_pid = hdr.pid, .sender_sock = hdr.sock,
1374                         .cookie = cookie
1375                 };
1376                 DLIST_ADD(ctx->in_msgs, msg);
1377                 talloc_set_destructor(msg, messaging_dgm_in_msg_destructor);
1378         }
1379
1380         space = msg->msglen - msg->received;
1381         if (buflen > space) {
1382                 goto close_fds;
1383         }
1384
1385         memcpy(msg->buf + msg->received, buf, buflen);
1386         msg->received += buflen;
1387
1388         if (msg->received < msg->msglen) {
1389                 /*
1390                  * Any valid sender will send the fds in the last
1391                  * block. Invalid senders might have sent fd's that we
1392                  * need to close here.
1393                  */
1394                 goto close_fds;
1395         }
1396
1397         DLIST_REMOVE(ctx->in_msgs, msg);
1398         talloc_set_destructor(msg, NULL);
1399
1400         ctx->recv_cb(ev, msg->buf, msg->msglen, fds, num_fds,
1401                      ctx->recv_cb_private_data);
1402
1403         TALLOC_FREE(msg);
1404         return;
1405
1406 close_fds:
1407         close_fd_array(fds, num_fds);
1408 }
1409
1410 void messaging_dgm_destroy(void)
1411 {
1412         TALLOC_FREE(global_dgm_context);
1413 }
1414
1415 int messaging_dgm_send(pid_t pid,
1416                        const struct iovec *iov, int iovlen,
1417                        const int *fds, size_t num_fds)
1418 {
1419         struct messaging_dgm_context *ctx = global_dgm_context;
1420         struct messaging_dgm_out *out;
1421         int ret;
1422
1423         if (ctx == NULL) {
1424                 return ENOTCONN;
1425         }
1426
1427         messaging_dgm_validate(ctx);
1428
1429         ret = messaging_dgm_out_get(ctx, pid, &out);
1430         if (ret != 0) {
1431                 return ret;
1432         }
1433
1434         DEBUG(10, ("%s: Sending message to %u\n", __func__, (unsigned)pid));
1435
1436         ret = messaging_dgm_out_send_fragmented(ctx->ev, out, iov, iovlen,
1437                                                 fds, num_fds);
1438         return ret;
1439 }
1440
1441 static int messaging_dgm_read_unique(int fd, uint64_t *punique)
1442 {
1443         char buf[25];
1444         ssize_t rw_ret;
1445         unsigned long long unique;
1446         char *endptr;
1447
1448         rw_ret = pread(fd, buf, sizeof(buf)-1, 0);
1449         if (rw_ret == -1) {
1450                 return errno;
1451         }
1452         buf[rw_ret] = '\0';
1453
1454         unique = strtoull(buf, &endptr, 10);
1455         if ((unique == 0) && (errno == EINVAL)) {
1456                 return EINVAL;
1457         }
1458         if ((unique == ULLONG_MAX) && (errno == ERANGE)) {
1459                 return ERANGE;
1460         }
1461         if (endptr[0] != '\n') {
1462                 return EINVAL;
1463         }
1464         *punique = unique;
1465         return 0;
1466 }
1467
1468 int messaging_dgm_get_unique(pid_t pid, uint64_t *unique)
1469 {
1470         struct messaging_dgm_context *ctx = global_dgm_context;
1471         struct sun_path_buf lockfile_name;
1472         int ret, fd;
1473
1474         if (ctx == NULL) {
1475                 return EBADF;
1476         }
1477
1478         messaging_dgm_validate(ctx);
1479
1480         if (pid == getpid()) {
1481                 /*
1482                  * Protect against losing our own lock
1483                  */
1484                 return messaging_dgm_read_unique(ctx->lockfile_fd, unique);
1485         }
1486
1487         ret = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf),
1488                        "%s/%u", ctx->lockfile_dir.buf, (int)pid);
1489         if (ret < 0) {
1490                 return errno;
1491         }
1492         if ((size_t)ret >= sizeof(lockfile_name.buf)) {
1493                 return ENAMETOOLONG;
1494         }
1495
1496         fd = open(lockfile_name.buf, O_NONBLOCK|O_RDONLY, 0);
1497         if (fd == -1) {
1498                 return errno;
1499         }
1500
1501         ret = messaging_dgm_read_unique(fd, unique);
1502         close(fd);
1503         return ret;
1504 }
1505
1506 int messaging_dgm_cleanup(pid_t pid)
1507 {
1508         struct messaging_dgm_context *ctx = global_dgm_context;
1509         struct sun_path_buf lockfile_name, socket_name;
1510         int fd, len, ret;
1511         struct flock lck = {};
1512
1513         if (ctx == NULL) {
1514                 return ENOTCONN;
1515         }
1516
1517         len = snprintf(socket_name.buf, sizeof(socket_name.buf), "%s/%u",
1518                        ctx->socket_dir.buf, (unsigned)pid);
1519         if (len < 0) {
1520                 return errno;
1521         }
1522         if ((size_t)len >= sizeof(socket_name.buf)) {
1523                 return ENAMETOOLONG;
1524         }
1525
1526         len = snprintf(lockfile_name.buf, sizeof(lockfile_name.buf), "%s/%u",
1527                        ctx->lockfile_dir.buf, (unsigned)pid);
1528         if (len < 0) {
1529                 return errno;
1530         }
1531         if ((size_t)len >= sizeof(lockfile_name.buf)) {
1532                 return ENAMETOOLONG;
1533         }
1534
1535         fd = open(lockfile_name.buf, O_NONBLOCK|O_WRONLY, 0);
1536         if (fd == -1) {
1537                 ret = errno;
1538                 if (ret != ENOENT) {
1539                         DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
1540                                    lockfile_name.buf, strerror(ret)));
1541                 }
1542                 return ret;
1543         }
1544
1545         lck.l_type = F_WRLCK;
1546         lck.l_whence = SEEK_SET;
1547         lck.l_start = 0;
1548         lck.l_len = 0;
1549
1550         ret = fcntl(fd, F_SETLK, &lck);
1551         if (ret != 0) {
1552                 ret = errno;
1553                 if ((ret != EACCES) && (ret != EAGAIN)) {
1554                         DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
1555                                    strerror(ret)));
1556                 }
1557                 close(fd);
1558                 return ret;
1559         }
1560
1561         DEBUG(10, ("%s: Cleaning up : %s\n", __func__, strerror(ret)));
1562
1563         (void)unlink(socket_name.buf);
1564         (void)unlink(lockfile_name.buf);
1565         (void)close(fd);
1566         return 0;
1567 }
1568
1569 static int messaging_dgm_wipe_fn(pid_t pid, void *private_data)
1570 {
1571         pid_t *our_pid = (pid_t *)private_data;
1572         int ret;
1573
1574         if (pid == *our_pid) {
1575                 /*
1576                  * fcntl(F_GETLK) will succeed for ourselves, we hold
1577                  * that lock ourselves.
1578                  */
1579                 return 0;
1580         }
1581
1582         ret = messaging_dgm_cleanup(pid);
1583         DEBUG(10, ("messaging_dgm_cleanup(%lu) returned %s\n",
1584                    (unsigned long)pid, ret ? strerror(ret) : "ok"));
1585
1586         return 0;
1587 }
1588
1589 int messaging_dgm_wipe(void)
1590 {
1591         pid_t pid = getpid();
1592         messaging_dgm_forall(messaging_dgm_wipe_fn, &pid);
1593         return 0;
1594 }
1595
1596 int messaging_dgm_forall(int (*fn)(pid_t pid, void *private_data),
1597                          void *private_data)
1598 {
1599         struct messaging_dgm_context *ctx = global_dgm_context;
1600         DIR *msgdir;
1601         struct dirent *dp;
1602
1603         if (ctx == NULL) {
1604                 return ENOTCONN;
1605         }
1606
1607         messaging_dgm_validate(ctx);
1608
1609         /*
1610          * We scan the socket directory and not the lock directory. Otherwise
1611          * we would race against messaging_dgm_lockfile_create's open(O_CREAT)
1612          * and fcntl(SETLK).
1613          */
1614
1615         msgdir = opendir(ctx->socket_dir.buf);
1616         if (msgdir == NULL) {
1617                 return errno;
1618         }
1619
1620         while ((dp = readdir(msgdir)) != NULL) {
1621                 unsigned long pid;
1622                 int ret;
1623
1624                 pid = strtoul(dp->d_name, NULL, 10);
1625                 if (pid == 0) {
1626                         /*
1627                          * . and .. and other malformed entries
1628                          */
1629                         continue;
1630                 }
1631
1632                 ret = fn(pid, private_data);
1633                 if (ret != 0) {
1634                         break;
1635                 }
1636         }
1637         closedir(msgdir);
1638
1639         return 0;
1640 }
1641
1642 struct messaging_dgm_fde {
1643         struct tevent_fd *fde;
1644 };
1645
1646 static int messaging_dgm_fde_ev_destructor(struct messaging_dgm_fde_ev *fde_ev)
1647 {
1648         if (fde_ev->ctx != NULL) {
1649                 DLIST_REMOVE(fde_ev->ctx->fde_evs, fde_ev);
1650                 fde_ev->ctx = NULL;
1651         }
1652         return 0;
1653 }
1654
1655 /*
1656  * Reference counter for a struct tevent_fd messaging read event
1657  * (with callback function) on a struct tevent_context registered
1658  * on a messaging context.
1659  *
1660  * If we've already registered this struct tevent_context before
1661  * (so already have a read event), just increase the reference count.
1662  *
1663  * Otherwise create a new struct tevent_fd messaging read event on the
1664  * previously unseen struct tevent_context - this is what drives
1665  * the message receive processing.
1666  *
1667  */
1668
1669 struct messaging_dgm_fde *messaging_dgm_register_tevent_context(
1670         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
1671 {
1672         struct messaging_dgm_context *ctx = global_dgm_context;
1673         struct messaging_dgm_fde_ev *fde_ev;
1674         struct messaging_dgm_fde *fde;
1675
1676         if (ctx == NULL) {
1677                 return NULL;
1678         }
1679
1680         fde = talloc(mem_ctx, struct messaging_dgm_fde);
1681         if (fde == NULL) {
1682                 return NULL;
1683         }
1684
1685         for (fde_ev = ctx->fde_evs; fde_ev != NULL; fde_ev = fde_ev->next) {
1686                 if (tevent_fd_get_flags(fde_ev->fde) == 0) {
1687                         /*
1688                          * If the event context got deleted,
1689                          * tevent_fd_get_flags() will return 0
1690                          * for the stale fde.
1691                          *
1692                          * In that case we should not
1693                          * use fde_ev->ev anymore.
1694                          */
1695                         continue;
1696                 }
1697                 if (fde_ev->ev == ev) {
1698                         break;
1699                 }
1700         }
1701
1702         if (fde_ev == NULL) {
1703                 fde_ev = talloc(fde, struct messaging_dgm_fde_ev);
1704                 if (fde_ev == NULL) {
1705                         return NULL;
1706                 }
1707                 fde_ev->fde = tevent_add_fd(
1708                         ev, fde_ev, ctx->sock, TEVENT_FD_READ,
1709                         messaging_dgm_read_handler, ctx);
1710                 if (fde_ev->fde == NULL) {
1711                         TALLOC_FREE(fde);
1712                         return NULL;
1713                 }
1714                 fde_ev->ev = ev;
1715                 fde_ev->ctx = ctx;
1716                 DLIST_ADD(ctx->fde_evs, fde_ev);
1717                 talloc_set_destructor(
1718                         fde_ev, messaging_dgm_fde_ev_destructor);
1719         } else {
1720                 /*
1721                  * Same trick as with tdb_wrap: The caller will never
1722                  * see the talloc_referenced object, the
1723                  * messaging_dgm_fde_ev, so problems with
1724                  * talloc_unlink will not happen.
1725                  */
1726                 if (talloc_reference(fde, fde_ev) == NULL) {
1727                         TALLOC_FREE(fde);
1728                         return NULL;
1729                 }
1730         }
1731
1732         fde->fde = fde_ev->fde;
1733         return fde;
1734 }
1735
1736 bool messaging_dgm_fde_active(struct messaging_dgm_fde *fde)
1737 {
1738         uint16_t flags;
1739
1740         if (fde == NULL) {
1741                 return false;
1742         }
1743         flags = tevent_fd_get_flags(fde->fde);
1744         return (flags != 0);
1745 }