lib: unix_dgram_msg does not need "num_fds"
[samba.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include "lib/iov_buf.h"
27 #include <fcntl.h>
28
29 /*
30  * This file implements two abstractions: The "unix_dgram" functions implement
31  * queueing for unix domain datagram sockets. You can send to a destination
32  * socket, and if that has no free space available, it will fall back to an
33  * anonymous socket that will poll for writability. "unix_dgram" expects the
34  * data size not to exceed the system limit.
35  *
36  * The "unix_msg" functions implement the fragmentation of large messages on
37  * top of "unix_dgram". This is what is exposed to the user of this API.
38  */
39
40 struct unix_dgram_msg {
41         struct unix_dgram_msg *prev, *next;
42
43         int sock;
44         ssize_t sent;
45         int sys_errno;
46         struct msghdr msg;
47         struct iovec iov;
48 };
49
50 struct unix_dgram_send_queue {
51         struct unix_dgram_send_queue *prev, *next;
52         struct unix_dgram_ctx *ctx;
53         int sock;
54         struct unix_dgram_msg *msgs;
55         char path[];
56 };
57
58 struct unix_dgram_ctx {
59         int sock;
60         pid_t created_pid;
61         const struct poll_funcs *ev_funcs;
62         size_t max_msg;
63
64         void (*recv_callback)(struct unix_dgram_ctx *ctx,
65                               uint8_t *msg, size_t msg_len,
66                               int *fds, size_t num_fds,
67                               void *private_data);
68         void *private_data;
69
70         struct poll_watch *sock_read_watch;
71         struct unix_dgram_send_queue *send_queues;
72
73         struct pthreadpool *send_pool;
74         struct poll_watch *pool_read_watch;
75
76         uint8_t *recv_buf;
77         char path[];
78 };
79
80 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
81                                     void *private_data);
82
83 /* Set socket non blocking. */
84 static int prepare_socket_nonblock(int sock)
85 {
86         int flags;
87 #ifdef O_NONBLOCK
88 #define FLAG_TO_SET O_NONBLOCK
89 #else
90 #ifdef SYSV
91 #define FLAG_TO_SET O_NDELAY
92 #else /* BSD */
93 #define FLAG_TO_SET FNDELAY
94 #endif
95 #endif
96
97         flags = fcntl(sock, F_GETFL);
98         if (flags == -1) {
99                 return errno;
100         }
101         flags |= FLAG_TO_SET;
102         if (fcntl(sock, F_SETFL, flags) == -1) {
103                 return errno;
104         }
105
106 #undef FLAG_TO_SET
107         return 0;
108 }
109
110 /* Set socket close on exec. */
111 static int prepare_socket_cloexec(int sock)
112 {
113 #ifdef FD_CLOEXEC
114         int flags;
115
116         flags = fcntl(sock, F_GETFD, 0);
117         if (flags == -1) {
118                 return errno;
119         }
120         flags |= FD_CLOEXEC;
121         if (fcntl(sock, F_SETFD, flags) == -1) {
122                 return errno;
123         }
124 #endif
125         return 0;
126 }
127
128 /* Set socket non blocking and close on exec. */
129 static int prepare_socket(int sock)
130 {
131         int ret = prepare_socket_nonblock(sock);
132
133         if (ret) {
134                 return ret;
135         }
136         return prepare_socket_cloexec(sock);
137 }
138
139 static void extract_fd_array_from_msghdr(struct msghdr *msg, int **fds,
140                                          size_t *num_fds)
141 {
142 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
143         struct cmsghdr *cmsg;
144
145         for(cmsg = CMSG_FIRSTHDR(msg);
146             cmsg != NULL;
147             cmsg = CMSG_NXTHDR(msg, cmsg))
148         {
149                 void *data = CMSG_DATA(cmsg);
150
151                 if (cmsg->cmsg_type != SCM_RIGHTS) {
152                         continue;
153                 }
154                 if (cmsg->cmsg_level != SOL_SOCKET) {
155                         continue;
156                 }
157
158                 *fds = (int *)data;
159                 *num_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof (int);
160                 break;
161         }
162 #endif
163 }
164
165 static void close_fd_array(int *fds, size_t num_fds)
166 {
167         size_t i;
168
169         for (i = 0; i < num_fds; i++) {
170                 if (fds[i] == -1) {
171                         continue;
172                 }
173
174                 close(fds[i]);
175                 fds[i] = -1;
176         }
177 }
178
179 static void close_fd_array_cmsg(struct msghdr *msg)
180 {
181         int *fds = NULL;
182         size_t num_fds = 0;
183
184         extract_fd_array_from_msghdr(msg, &fds, &num_fds);
185
186         /*
187          * TODO: caveat - side-effect - changing msg ???
188          */
189         close_fd_array(fds, num_fds);
190 }
191
192 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
193                            const struct poll_funcs *ev_funcs,
194                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
195                                                  uint8_t *msg, size_t msg_len,
196                                                  int *fds, size_t num_fds,
197                                                  void *private_data),
198                            void *private_data,
199                            struct unix_dgram_ctx **result)
200 {
201         struct unix_dgram_ctx *ctx;
202         size_t pathlen;
203         int ret;
204
205         if (addr != NULL) {
206                 pathlen = strlen(addr->sun_path)+1;
207         } else {
208                 pathlen = 1;
209         }
210
211         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
212         if (ctx == NULL) {
213                 return ENOMEM;
214         }
215         if (addr != NULL) {
216                 memcpy(ctx->path, addr->sun_path, pathlen);
217         } else {
218                 ctx->path[0] = '\0';
219         }
220
221         *ctx = (struct unix_dgram_ctx) {
222                 .max_msg = max_msg,
223                 .ev_funcs = ev_funcs,
224                 .recv_callback = recv_callback,
225                 .private_data = private_data,
226                 .created_pid = (pid_t)-1
227         };
228
229         ctx->recv_buf = malloc(max_msg);
230         if (ctx->recv_buf == NULL) {
231                 free(ctx);
232                 return ENOMEM;
233         }
234
235         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
236         if (ctx->sock == -1) {
237                 ret = errno;
238                 goto fail_free;
239         }
240
241         /* Set non-blocking and close-on-exec. */
242         ret = prepare_socket(ctx->sock);
243         if (ret != 0) {
244                 goto fail_close;
245         }
246
247         if (addr != NULL) {
248                 ret = bind(ctx->sock,
249                            (const struct sockaddr *)(const void *)addr,
250                            sizeof(*addr));
251                 if (ret == -1) {
252                         ret = errno;
253                         goto fail_close;
254                 }
255
256                 ctx->created_pid = getpid();
257
258                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
259                         ctx->ev_funcs, ctx->sock, POLLIN,
260                         unix_dgram_recv_handler, ctx);
261
262                 if (ctx->sock_read_watch == NULL) {
263                         ret = ENOMEM;
264                         goto fail_close;
265                 }
266         }
267
268         *result = ctx;
269         return 0;
270
271 fail_close:
272         close(ctx->sock);
273 fail_free:
274         free(ctx->recv_buf);
275         free(ctx);
276         return ret;
277 }
278
279 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
280                                     void *private_data)
281 {
282         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
283         ssize_t received;
284         int flags = 0;
285         struct msghdr msg;
286         struct iovec iov;
287 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
288         char buf[CMSG_SPACE(sizeof(int)*INT8_MAX)] = { 0, };
289 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
290         int *fds = NULL;
291         size_t i, num_fds = 0;
292
293         iov = (struct iovec) {
294                 .iov_base = (void *)ctx->recv_buf,
295                 .iov_len = ctx->max_msg,
296         };
297
298         msg = (struct msghdr) {
299                 .msg_iov = &iov,
300                 .msg_iovlen = 1,
301 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
302                 .msg_control = buf,
303                 .msg_controllen = sizeof(buf),
304 #endif
305         };
306
307 #ifdef MSG_CMSG_CLOEXEC
308         flags |= MSG_CMSG_CLOEXEC;
309 #endif
310
311         received = recvmsg(fd, &msg, flags);
312         if (received == -1) {
313                 if ((errno == EAGAIN) ||
314                     (errno == EWOULDBLOCK) ||
315                     (errno == EINTR) || (errno == ENOMEM)) {
316                         /* Not really an error - just try again. */
317                         return;
318                 }
319                 /* Problem with the socket. Set it unreadable. */
320                 ctx->ev_funcs->watch_update(w, 0);
321                 return;
322         }
323         if (received > ctx->max_msg) {
324                 /* More than we expected, not for us */
325                 return;
326         }
327
328         extract_fd_array_from_msghdr(&msg, &fds, &num_fds);
329
330         for (i = 0; i < num_fds; i++) {
331                 int err;
332
333                 err = prepare_socket_cloexec(fds[i]);
334                 if (err != 0) {
335                         goto cleanup_fds;
336                 }
337         }
338
339         ctx->recv_callback(ctx, ctx->recv_buf, received,
340                            fds, num_fds, ctx->private_data);
341
342         /*
343          * Close those fds that the callback has not set to -1.
344          */
345         close_fd_array(fds, num_fds);
346
347         return;
348
349 cleanup_fds:
350         close_fd_array(fds, num_fds);
351
352         ctx->recv_callback(ctx, ctx->recv_buf, received,
353                            NULL, 0, ctx->private_data);
354 }
355
356 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
357                                     void *private_data);
358
359 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
360 {
361         int ret, signalfd;
362
363         if (ctx->send_pool != NULL) {
364                 return 0;
365         }
366
367         ret = pthreadpool_init(0, &ctx->send_pool);
368         if (ret != 0) {
369                 return ret;
370         }
371
372         signalfd = pthreadpool_signal_fd(ctx->send_pool);
373
374         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
375                 ctx->ev_funcs, signalfd, POLLIN,
376                 unix_dgram_job_finished, ctx);
377         if (ctx->pool_read_watch == NULL) {
378                 pthreadpool_destroy(ctx->send_pool);
379                 ctx->send_pool = NULL;
380                 return ENOMEM;
381         }
382
383         return 0;
384 }
385
386 static int unix_dgram_send_queue_init(
387         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
388         struct unix_dgram_send_queue **result)
389 {
390         struct unix_dgram_send_queue *q;
391         size_t pathlen;
392         int ret, err;
393
394         pathlen = strlen(dst->sun_path)+1;
395
396         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
397         if (q == NULL) {
398                 return ENOMEM;
399         }
400         q->ctx = ctx;
401         q->msgs = NULL;
402         memcpy(q->path, dst->sun_path, pathlen);
403
404         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
405         if (q->sock == -1) {
406                 err = errno;
407                 goto fail_free;
408         }
409
410         err = prepare_socket_cloexec(q->sock);
411         if (err != 0) {
412                 goto fail_close;
413         }
414
415         do {
416                 ret = connect(q->sock,
417                               (const struct sockaddr *)(const void *)dst,
418                               sizeof(*dst));
419         } while ((ret == -1) && (errno == EINTR));
420
421         if (ret == -1) {
422                 err = errno;
423                 goto fail_close;
424         }
425
426         err = unix_dgram_init_pthreadpool(ctx);
427         if (err != 0) {
428                 goto fail_close;
429         }
430
431         DLIST_ADD(ctx->send_queues, q);
432
433         *result = q;
434         return 0;
435
436 fail_close:
437         close(q->sock);
438 fail_free:
439         free(q);
440         return err;
441 }
442
443 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
444 {
445         struct unix_dgram_ctx *ctx = q->ctx;
446
447         while (q->msgs != NULL) {
448                 struct unix_dgram_msg *msg;
449                 msg = q->msgs;
450                 DLIST_REMOVE(q->msgs, msg);
451                 close_fd_array_cmsg(&msg->msg);
452                 free(msg);
453         }
454         close(q->sock);
455         DLIST_REMOVE(ctx->send_queues, q);
456         free(q);
457 }
458
459 static struct unix_dgram_send_queue *find_send_queue(
460         struct unix_dgram_ctx *ctx, const char *dst_sock)
461 {
462         struct unix_dgram_send_queue *s;
463
464         for (s = ctx->send_queues; s != NULL; s = s->next) {
465                 if (strcmp(s->path, dst_sock) == 0) {
466                         return s;
467                 }
468         }
469         return NULL;
470 }
471
472 static int queue_msg(struct unix_dgram_send_queue *q,
473                      const struct iovec *iov, int iovlen,
474                      const int *fds, size_t num_fds)
475 {
476         struct unix_dgram_msg *msg;
477         ssize_t data_len;
478         uint8_t *data_buf;
479         size_t msglen = sizeof(struct unix_dgram_msg);
480         int i;
481         size_t tmp;
482         int ret = -1;
483 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
484         size_t fds_size = sizeof(int) * MIN(num_fds, INT8_MAX);
485         int fds_copy[MIN(num_fds, INT8_MAX)];
486         size_t cmsg_len = CMSG_LEN(fds_size);
487         size_t cmsg_space = CMSG_SPACE(fds_size);
488         char *cmsg_buf;
489
490         /*
491          * Note: No need to check for overflow here,
492          * since cmsg will store <= INT8_MAX fds.
493          */
494         msglen += cmsg_space;
495
496 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
497
498         if (num_fds > INT8_MAX) {
499                 return EINVAL;
500         }
501
502 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
503         if (num_fds > 0) {
504                 return ENOSYS;
505         }
506 #endif
507
508         data_len = iov_buflen(iov, iovlen);
509         if (data_len == -1) {
510                 return EINVAL;
511         }
512
513         tmp = msglen + data_len;
514         if ((tmp < msglen) || (tmp < data_len)) {
515                 /* overflow */
516                 return EINVAL;
517         }
518         msglen = tmp;
519
520 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
521         for (i = 0; i < num_fds; i++) {
522                 fds_copy[i] = -1;
523         }
524
525         for (i = 0; i < num_fds; i++) {
526                 fds_copy[i] = dup(fds[i]);
527                 if (fds_copy[i] == -1) {
528                         ret = errno;
529                         goto fail;
530                 }
531         }
532 #endif
533
534         msg = malloc(msglen);
535         if (msg == NULL) {
536                 ret = ENOMEM;
537                 goto fail;
538         }
539
540         msg->sock = q->sock;
541
542         data_buf = (uint8_t *)(msg + 1);
543
544 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
545         if (num_fds > 0) {
546                 cmsg_buf = (char *)data_buf;
547                 memset(cmsg_buf, 0, cmsg_space);
548                 data_buf += cmsg_space;
549         } else {
550                 cmsg_buf = NULL;
551                 cmsg_space = 0;
552         }
553 #endif
554
555         msg->iov = (struct iovec) {
556                 .iov_base = (void *)data_buf,
557                 .iov_len = data_len,
558         };
559
560         msg->msg = (struct msghdr) {
561                 .msg_iov = &msg->iov,
562                 .msg_iovlen = 1,
563 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
564                 .msg_control = cmsg_buf,
565                 .msg_controllen = cmsg_space,
566 #endif
567         };
568
569 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
570         if (num_fds > 0) {
571                 struct cmsghdr *cmsg;
572                 void *fdptr;
573
574                 cmsg = CMSG_FIRSTHDR(&msg->msg);
575                 cmsg->cmsg_level = SOL_SOCKET;
576                 cmsg->cmsg_type = SCM_RIGHTS;
577                 cmsg->cmsg_len = cmsg_len;
578                 fdptr = CMSG_DATA(cmsg);
579                 memcpy(fdptr, fds_copy, fds_size);
580                 msg->msg.msg_controllen = cmsg->cmsg_len;
581         }
582 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
583
584         iov_buf(iov, iovlen, data_buf, data_len);
585
586         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
587         return 0;
588
589 fail:
590 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
591         close_fd_array(fds_copy, num_fds);
592 #endif
593         return ret;
594 }
595
596 static void unix_dgram_send_job(void *private_data)
597 {
598         struct unix_dgram_msg *dmsg = private_data;
599
600         do {
601                 dmsg->sent = sendmsg(dmsg->sock, &dmsg->msg, 0);
602         } while ((dmsg->sent == -1) && (errno == EINTR));
603
604         if (dmsg->sent == -1) {
605                 dmsg->sys_errno = errno;
606         }
607 }
608
609 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
610                                     void *private_data)
611 {
612         struct unix_dgram_ctx *ctx = private_data;
613         struct unix_dgram_send_queue *q;
614         struct unix_dgram_msg *msg;
615         int ret, job;
616
617         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
618         if (ret != 1) {
619                 return;
620         }
621
622         for (q = ctx->send_queues; q != NULL; q = q->next) {
623                 if (job == q->sock) {
624                         break;
625                 }
626         }
627
628         if (q == NULL) {
629                 /* Huh? Should not happen */
630                 return;
631         }
632
633         msg = q->msgs;
634         DLIST_REMOVE(q->msgs, msg);
635         close_fd_array_cmsg(&msg->msg);
636         free(msg);
637
638         if (q->msgs != NULL) {
639                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
640                                           unix_dgram_send_job, q->msgs);
641                 if (ret == 0) {
642                         return;
643                 }
644         }
645
646         unix_dgram_send_queue_free(q);
647 }
648
649 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
650                            const struct sockaddr_un *dst,
651                            const struct iovec *iov, int iovlen,
652                            const int *fds, size_t num_fds)
653 {
654         struct unix_dgram_send_queue *q;
655         struct msghdr msg;
656 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
657         struct cmsghdr *cmsg;
658         size_t fds_size = sizeof(int) * num_fds;
659         size_t cmsg_len = CMSG_LEN(fds_size);
660         size_t cmsg_space = CMSG_SPACE(fds_size);
661         char cmsg_buf[cmsg_space];
662 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
663         int ret;
664         int i;
665
666         if (num_fds > INT8_MAX) {
667                 return EINVAL;
668         }
669
670 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
671         if (num_fds > 0) {
672                 return ENOSYS;
673         }
674 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
675
676         for (i = 0; i < num_fds; i++) {
677                 /*
678                  * Make sure we only allow fd passing
679                  * for communication channels,
680                  * e.g. sockets, pipes, fifos, ...
681                  */
682                 ret = lseek(fds[i], 0, SEEK_CUR);
683                 if (ret == -1 && errno == ESPIPE) {
684                         /* ok */
685                         continue;
686                 }
687
688                 /*
689                  * Reject the message as we may need to call dup(),
690                  * if we queue the message.
691                  *
692                  * That might result in unexpected behavior for the caller
693                  * for files and broken posix locking.
694                  */
695                 return EINVAL;
696         }
697
698         /*
699          * To preserve message ordering, we have to queue a message when
700          * others are waiting in line already.
701          */
702         q = find_send_queue(ctx, dst->sun_path);
703         if (q != NULL) {
704                 return queue_msg(q, iov, iovlen, fds, num_fds);
705         }
706
707         /*
708          * Try a cheap nonblocking send
709          */
710
711         msg = (struct msghdr) {
712                 .msg_name = discard_const_p(struct sockaddr_un, dst),
713                 .msg_namelen = sizeof(*dst),
714                 .msg_iov = discard_const_p(struct iovec, iov),
715                 .msg_iovlen = iovlen
716         };
717 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
718         if (num_fds > 0) {
719                 void *fdptr;
720
721                 memset(cmsg_buf, 0, cmsg_space);
722
723                 msg.msg_control = cmsg_buf;
724                 msg.msg_controllen = cmsg_space;
725                 cmsg = CMSG_FIRSTHDR(&msg);
726                 cmsg->cmsg_level = SOL_SOCKET;
727                 cmsg->cmsg_type = SCM_RIGHTS;
728                 cmsg->cmsg_len = cmsg_len;
729                 fdptr = CMSG_DATA(cmsg);
730                 memcpy(fdptr, fds, fds_size);
731                 msg.msg_controllen = cmsg->cmsg_len;
732         }
733 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
734
735         ret = sendmsg(ctx->sock, &msg, 0);
736         if (ret >= 0) {
737                 return 0;
738         }
739         if ((errno != EWOULDBLOCK) &&
740             (errno != EAGAIN) &&
741 #ifdef ENOBUFS
742             /* FreeBSD can give this for large messages */
743             (errno != ENOBUFS) &&
744 #endif
745             (errno != EINTR)) {
746                 return errno;
747         }
748
749         ret = unix_dgram_send_queue_init(ctx, dst, &q);
750         if (ret != 0) {
751                 return ret;
752         }
753         ret = queue_msg(q, iov, iovlen, fds, num_fds);
754         if (ret != 0) {
755                 unix_dgram_send_queue_free(q);
756                 return ret;
757         }
758         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
759                                   unix_dgram_send_job, q->msgs);
760         if (ret != 0) {
761                 unix_dgram_send_queue_free(q);
762                 return ret;
763         }
764         return 0;
765 }
766
767 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
768 {
769         return ctx->sock;
770 }
771
772 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
773 {
774         if (ctx->send_queues != NULL) {
775                 return EBUSY;
776         }
777
778         if (ctx->send_pool != NULL) {
779                 int ret = pthreadpool_destroy(ctx->send_pool);
780                 if (ret != 0) {
781                         return ret;
782                 }
783                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
784         }
785
786         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
787
788         if (getpid() == ctx->created_pid) {
789                 /* If we created it, unlink. Otherwise someone else might
790                  * still have it open */
791                 unlink(ctx->path);
792         }
793
794         close(ctx->sock);
795         free(ctx->recv_buf);
796         free(ctx);
797         return 0;
798 }
799
800 /*
801  * Every message starts with a uint64_t cookie.
802  *
803  * A value of 0 indicates a single-fragment message which is complete in
804  * itself. The data immediately follows the cookie.
805  *
806  * Every multi-fragment message has a cookie != 0 and starts with a cookie
807  * followed by a struct unix_msg_header and then the data. The pid and sock
808  * fields are used to assure uniqueness on the receiver side.
809  */
810
811 struct unix_msg_hdr {
812         size_t msglen;
813         pid_t pid;
814         int sock;
815 };
816
817 struct unix_msg {
818         struct unix_msg *prev, *next;
819         size_t msglen;
820         size_t received;
821         pid_t sender_pid;
822         int sender_sock;
823         uint64_t cookie;
824         uint8_t buf[1];
825 };
826
827 struct unix_msg_ctx {
828         struct unix_dgram_ctx *dgram;
829         size_t fragment_len;
830         uint64_t cookie;
831
832         void (*recv_callback)(struct unix_msg_ctx *ctx,
833                               uint8_t *msg, size_t msg_len,
834                               int *fds, size_t num_fds,
835                               void *private_data);
836         void *private_data;
837
838         struct unix_msg *msgs;
839 };
840
841 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
842                           uint8_t *buf, size_t buflen,
843                           int *fds, size_t num_fds,
844                           void *private_data);
845
846 int unix_msg_init(const struct sockaddr_un *addr,
847                   const struct poll_funcs *ev_funcs,
848                   size_t fragment_len, uint64_t cookie,
849                   void (*recv_callback)(struct unix_msg_ctx *ctx,
850                                         uint8_t *msg, size_t msg_len,
851                                         int *fds, size_t num_fds,
852                                         void *private_data),
853                   void *private_data,
854                   struct unix_msg_ctx **result)
855 {
856         struct unix_msg_ctx *ctx;
857         int ret;
858
859         ctx = malloc(sizeof(*ctx));
860         if (ctx == NULL) {
861                 return ENOMEM;
862         }
863
864         *ctx = (struct unix_msg_ctx) {
865                 .fragment_len = fragment_len,
866                 .cookie = cookie,
867                 .recv_callback = recv_callback,
868                 .private_data = private_data
869         };
870
871         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
872                               unix_msg_recv, ctx, &ctx->dgram);
873         if (ret != 0) {
874                 free(ctx);
875                 return ret;
876         }
877
878         *result = ctx;
879         return 0;
880 }
881
882 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
883                   const struct iovec *iov, int iovlen,
884                   const int *fds, size_t num_fds)
885 {
886         ssize_t msglen;
887         size_t sent;
888         int ret = 0;
889         struct iovec iov_copy[iovlen+2];
890         struct unix_msg_hdr hdr;
891         struct iovec src_iov;
892
893         if (iovlen < 0) {
894                 return EINVAL;
895         }
896
897         msglen = iov_buflen(iov, iovlen);
898         if (msglen == -1) {
899                 return EINVAL;
900         }
901
902 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
903         if (num_fds > 0) {
904                 return ENOSYS;
905         }
906 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
907
908         if (num_fds > INT8_MAX) {
909                 return EINVAL;
910         }
911
912         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
913                 uint64_t cookie = 0;
914
915                 iov_copy[0].iov_base = &cookie;
916                 iov_copy[0].iov_len = sizeof(cookie);
917                 if (iovlen > 0) {
918                         memcpy(&iov_copy[1], iov,
919                                sizeof(struct iovec) * iovlen);
920                 }
921
922                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
923                                        fds, num_fds);
924         }
925
926         hdr = (struct unix_msg_hdr) {
927                 .msglen = msglen,
928                 .pid = getpid(),
929                 .sock = unix_dgram_sock(ctx->dgram)
930         };
931
932         iov_copy[0].iov_base = &ctx->cookie;
933         iov_copy[0].iov_len = sizeof(ctx->cookie);
934         iov_copy[1].iov_base = &hdr;
935         iov_copy[1].iov_len = sizeof(hdr);
936
937         sent = 0;
938         src_iov = iov[0];
939
940         /*
941          * The following write loop sends the user message in pieces. We have
942          * filled the first two iovecs above with "cookie" and "hdr". In the
943          * following loops we pull message chunks from the user iov array and
944          * fill iov_copy piece by piece, possibly truncating chunks from the
945          * caller's iov array. Ugly, but hopefully efficient.
946          */
947
948         while (sent < msglen) {
949                 size_t fragment_len;
950                 size_t iov_index = 2;
951
952                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
953
954                 while (fragment_len < ctx->fragment_len) {
955                         size_t space, chunk;
956
957                         space = ctx->fragment_len - fragment_len;
958                         chunk = MIN(space, src_iov.iov_len);
959
960                         iov_copy[iov_index].iov_base = src_iov.iov_base;
961                         iov_copy[iov_index].iov_len = chunk;
962                         iov_index += 1;
963
964                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
965                         src_iov.iov_len -= chunk;
966                         fragment_len += chunk;
967
968                         if (src_iov.iov_len == 0) {
969                                 iov += 1;
970                                 iovlen -= 1;
971                                 if (iovlen == 0) {
972                                         break;
973                                 }
974                                 src_iov = iov[0];
975                         }
976                 }
977                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
978
979                 /*
980                  * only the last fragment should pass the fd array.
981                  * That simplifies the receiver a lot.
982                  */
983                 if (sent < msglen) {
984                         ret = unix_dgram_send(ctx->dgram, dst,
985                                               iov_copy, iov_index,
986                                               NULL, 0);
987                 } else {
988                         ret = unix_dgram_send(ctx->dgram, dst,
989                                               iov_copy, iov_index,
990                                               fds, num_fds);
991                 }
992                 if (ret != 0) {
993                         break;
994                 }
995         }
996
997         ctx->cookie += 1;
998         if (ctx->cookie == 0) {
999                 ctx->cookie += 1;
1000         }
1001
1002         return ret;
1003 }
1004
1005 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
1006                           uint8_t *buf, size_t buflen,
1007                           int *fds, size_t num_fds,
1008                           void *private_data)
1009 {
1010         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
1011         struct unix_msg_hdr hdr;
1012         struct unix_msg *msg;
1013         size_t space;
1014         uint64_t cookie;
1015
1016         if (buflen < sizeof(cookie)) {
1017                 goto close_fds;
1018         }
1019
1020         memcpy(&cookie, buf, sizeof(cookie));
1021
1022         buf += sizeof(cookie);
1023         buflen -= sizeof(cookie);
1024
1025         if (cookie == 0) {
1026                 ctx->recv_callback(ctx, buf, buflen, fds, num_fds, ctx->private_data);
1027                 return;
1028         }
1029
1030         if (buflen < sizeof(hdr)) {
1031                 goto close_fds;
1032         }
1033         memcpy(&hdr, buf, sizeof(hdr));
1034
1035         buf += sizeof(hdr);
1036         buflen -= sizeof(hdr);
1037
1038         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
1039                 if ((msg->sender_pid == hdr.pid) &&
1040                     (msg->sender_sock == hdr.sock)) {
1041                         break;
1042                 }
1043         }
1044
1045         if ((msg != NULL) && (msg->cookie != cookie)) {
1046                 DLIST_REMOVE(ctx->msgs, msg);
1047                 free(msg);
1048                 msg = NULL;
1049         }
1050
1051         if (msg == NULL) {
1052                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
1053                 if (msg == NULL) {
1054                         goto close_fds;
1055                 }
1056                 *msg = (struct unix_msg) {
1057                         .msglen = hdr.msglen,
1058                         .sender_pid = hdr.pid,
1059                         .sender_sock = hdr.sock,
1060                         .cookie = cookie
1061                 };
1062                 DLIST_ADD(ctx->msgs, msg);
1063         }
1064
1065         space = msg->msglen - msg->received;
1066         if (buflen > space) {
1067                 goto close_fds;
1068         }
1069
1070         memcpy(msg->buf + msg->received, buf, buflen);
1071         msg->received += buflen;
1072
1073         if (msg->received < msg->msglen) {
1074                 goto close_fds;
1075         }
1076
1077         DLIST_REMOVE(ctx->msgs, msg);
1078         ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds, ctx->private_data);
1079         free(msg);
1080         return;
1081
1082 close_fds:
1083         close_fd_array(fds, num_fds);
1084 }
1085
1086 int unix_msg_free(struct unix_msg_ctx *ctx)
1087 {
1088         int ret;
1089
1090         ret = unix_dgram_free(ctx->dgram);
1091         if (ret != 0) {
1092                 return ret;
1093         }
1094
1095         while (ctx->msgs != NULL) {
1096                 struct unix_msg *msg = ctx->msgs;
1097                 DLIST_REMOVE(ctx->msgs, msg);
1098                 free(msg);
1099         }
1100
1101         free(ctx);
1102         return 0;
1103 }