s3:unix_msg: close the fds in unix_dgram_recv_handler() after the callback has run
[obnox/samba/samba-obnox.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include <fcntl.h>
27
28 /*
29  * This file implements two abstractions: The "unix_dgram" functions implement
30  * queueing for unix domain datagram sockets. You can send to a destination
31  * socket, and if that has no free space available, it will fall back to an
32  * anonymous socket that will poll for writability. "unix_dgram" expects the
33  * data size not to exceed the system limit.
34  *
35  * The "unix_msg" functions implement the fragmentation of large messages on
36  * top of "unix_dgram". This is what is exposed to the user of this API.
37  */
38
39 struct unix_dgram_msg {
40         struct unix_dgram_msg *prev, *next;
41
42         int sock;
43         ssize_t sent;
44         int sys_errno;
45         size_t num_fds;
46         struct msghdr msg;
47         struct iovec iov;
48 };
49
50 struct unix_dgram_send_queue {
51         struct unix_dgram_send_queue *prev, *next;
52         struct unix_dgram_ctx *ctx;
53         int sock;
54         struct unix_dgram_msg *msgs;
55         char path[];
56 };
57
58 struct unix_dgram_ctx {
59         int sock;
60         pid_t created_pid;
61         const struct poll_funcs *ev_funcs;
62         size_t max_msg;
63
64         void (*recv_callback)(struct unix_dgram_ctx *ctx,
65                               uint8_t *msg, size_t msg_len,
66                               int *fds, size_t num_fds,
67                               void *private_data);
68         void *private_data;
69
70         struct poll_watch *sock_read_watch;
71         struct unix_dgram_send_queue *send_queues;
72
73         struct pthreadpool *send_pool;
74         struct poll_watch *pool_read_watch;
75
76         uint8_t *recv_buf;
77         char path[];
78 };
79
80 static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
81 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
82                                     void *private_data);
83
84 /* Set socket non blocking. */
85 static int prepare_socket_nonblock(int sock)
86 {
87         int flags;
88 #ifdef O_NONBLOCK
89 #define FLAG_TO_SET O_NONBLOCK
90 #else
91 #ifdef SYSV
92 #define FLAG_TO_SET O_NDELAY
93 #else /* BSD */
94 #define FLAG_TO_SET FNDELAY
95 #endif
96 #endif
97
98         flags = fcntl(sock, F_GETFL);
99         if (flags == -1) {
100                 return errno;
101         }
102         flags |= FLAG_TO_SET;
103         if (fcntl(sock, F_SETFL, flags) == -1) {
104                 return errno;
105         }
106
107 #undef FLAG_TO_SET
108         return 0;
109 }
110
111 /* Set socket close on exec. */
112 static int prepare_socket_cloexec(int sock)
113 {
114 #ifdef FD_CLOEXEC
115         int flags;
116
117         flags = fcntl(sock, F_GETFD, 0);
118         if (flags == -1) {
119                 return errno;
120         }
121         flags |= FD_CLOEXEC;
122         if (fcntl(sock, F_SETFD, flags) == -1) {
123                 return errno;
124         }
125 #endif
126         return 0;
127 }
128
129 /* Set socket non blocking and close on exec. */
130 static int prepare_socket(int sock)
131 {
132         int ret = prepare_socket_nonblock(sock);
133
134         if (ret) {
135                 return ret;
136         }
137         return prepare_socket_cloexec(sock);
138 }
139
140 static void extract_fd_array_from_msghdr(struct msghdr *msg, int **fds,
141                                          size_t *num_fds)
142 {
143 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
144         struct cmsghdr *cmsg;
145
146         for(cmsg = CMSG_FIRSTHDR(msg);
147             cmsg != NULL;
148             cmsg = CMSG_NXTHDR(msg, cmsg))
149         {
150                 void *data = CMSG_DATA(cmsg);
151
152                 if (cmsg->cmsg_type != SCM_RIGHTS) {
153                         continue;
154                 }
155                 if (cmsg->cmsg_level != SOL_SOCKET) {
156                         continue;
157                 }
158
159                 *fds = (int *)data;
160                 *num_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof (int);
161                 break;
162         }
163 #endif
164 }
165
166 static void close_fd_array(int *fds, size_t num_fds)
167 {
168         size_t i;
169
170         for (i = 0; i < num_fds; i++) {
171                 if (fds[i] == -1) {
172                         continue;
173                 }
174
175                 close(fds[i]);
176                 fds[i] = -1;
177         }
178 }
179
180 static void close_fd_array_cmsg(struct msghdr *msg)
181 {
182         int *fds = NULL;
183         size_t num_fds = 0;
184
185         extract_fd_array_from_msghdr(msg, &fds, &num_fds);
186
187         /*
188          * TODO: caveat - side-effect - changing msg ???
189          */
190         close_fd_array(fds, num_fds);
191 }
192
193 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
194                            const struct poll_funcs *ev_funcs,
195                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
196                                                  uint8_t *msg, size_t msg_len,
197                                                  int *fds, size_t num_fds,
198                                                  void *private_data),
199                            void *private_data,
200                            struct unix_dgram_ctx **result)
201 {
202         struct unix_dgram_ctx *ctx;
203         size_t pathlen;
204         int ret;
205
206         if (addr != NULL) {
207                 pathlen = strlen(addr->sun_path)+1;
208         } else {
209                 pathlen = 1;
210         }
211
212         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
213         if (ctx == NULL) {
214                 return ENOMEM;
215         }
216         if (addr != NULL) {
217                 memcpy(ctx->path, addr->sun_path, pathlen);
218         } else {
219                 ctx->path[0] = '\0';
220         }
221
222         *ctx = (struct unix_dgram_ctx) {
223                 .max_msg = max_msg,
224                 .ev_funcs = ev_funcs,
225                 .recv_callback = recv_callback,
226                 .private_data = private_data,
227                 .created_pid = (pid_t)-1
228         };
229
230         ctx->recv_buf = malloc(max_msg);
231         if (ctx->recv_buf == NULL) {
232                 free(ctx);
233                 return ENOMEM;
234         }
235
236         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
237         if (ctx->sock == -1) {
238                 ret = errno;
239                 goto fail_free;
240         }
241
242         /* Set non-blocking and close-on-exec. */
243         ret = prepare_socket(ctx->sock);
244         if (ret != 0) {
245                 goto fail_close;
246         }
247
248         if (addr != NULL) {
249                 ret = bind(ctx->sock,
250                            (const struct sockaddr *)(const void *)addr,
251                            sizeof(*addr));
252                 if (ret == -1) {
253                         ret = errno;
254                         goto fail_close;
255                 }
256
257                 ctx->created_pid = getpid();
258
259                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
260                         ctx->ev_funcs, ctx->sock, POLLIN,
261                         unix_dgram_recv_handler, ctx);
262
263                 if (ctx->sock_read_watch == NULL) {
264                         ret = ENOMEM;
265                         goto fail_close;
266                 }
267         }
268
269         *result = ctx;
270         return 0;
271
272 fail_close:
273         close(ctx->sock);
274 fail_free:
275         free(ctx->recv_buf);
276         free(ctx);
277         return ret;
278 }
279
280 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
281                                     void *private_data)
282 {
283         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
284         ssize_t received;
285         int flags = 0;
286         struct msghdr msg;
287         struct iovec iov;
288 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
289         char buf[CMSG_SPACE(sizeof(int)*INT8_MAX)] = { 0, };
290 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
291         int *fds = NULL;
292         size_t i, num_fds = 0;
293
294         iov = (struct iovec) {
295                 .iov_base = (void *)ctx->recv_buf,
296                 .iov_len = ctx->max_msg,
297         };
298
299         msg = (struct msghdr) {
300                 .msg_iov = &iov,
301                 .msg_iovlen = 1,
302 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
303                 .msg_control = buf,
304                 .msg_controllen = sizeof(buf),
305 #endif
306         };
307
308 #ifdef MSG_CMSG_CLOEXEC
309         flags |= MSG_CMSG_CLOEXEC;
310 #endif
311
312         received = recvmsg(fd, &msg, flags);
313         if (received == -1) {
314                 if ((errno == EAGAIN) ||
315                     (errno == EWOULDBLOCK) ||
316                     (errno == EINTR) || (errno == ENOMEM)) {
317                         /* Not really an error - just try again. */
318                         return;
319                 }
320                 /* Problem with the socket. Set it unreadable. */
321                 ctx->ev_funcs->watch_update(w, 0);
322                 return;
323         }
324         if (received > ctx->max_msg) {
325                 /* More than we expected, not for us */
326                 return;
327         }
328
329         extract_fd_array_from_msghdr(&msg, &fds, &num_fds);
330
331         for (i = 0; i < num_fds; i++) {
332                 int err;
333
334                 err = prepare_socket_cloexec(fds[i]);
335                 if (err != 0) {
336                         goto cleanup_fds;
337                 }
338         }
339
340         ctx->recv_callback(ctx, ctx->recv_buf, received,
341                            fds, num_fds, ctx->private_data);
342
343         /*
344          * Close those fds that the callback has not set to -1.
345          */
346         close_fd_array(fds, num_fds);
347
348         return;
349
350 cleanup_fds:
351         close_fd_array(fds, num_fds);
352
353         ctx->recv_callback(ctx, ctx->recv_buf, received,
354                            NULL, 0, ctx->private_data);
355 }
356
357 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
358                                     void *private_data);
359
360 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
361 {
362         int ret, signalfd;
363
364         if (ctx->send_pool != NULL) {
365                 return 0;
366         }
367
368         ret = pthreadpool_init(0, &ctx->send_pool);
369         if (ret != 0) {
370                 return ret;
371         }
372
373         signalfd = pthreadpool_signal_fd(ctx->send_pool);
374
375         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
376                 ctx->ev_funcs, signalfd, POLLIN,
377                 unix_dgram_job_finished, ctx);
378         if (ctx->pool_read_watch == NULL) {
379                 pthreadpool_destroy(ctx->send_pool);
380                 ctx->send_pool = NULL;
381                 return ENOMEM;
382         }
383
384         return 0;
385 }
386
387 static int unix_dgram_send_queue_init(
388         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
389         struct unix_dgram_send_queue **result)
390 {
391         struct unix_dgram_send_queue *q;
392         size_t pathlen;
393         int ret, err;
394
395         pathlen = strlen(dst->sun_path)+1;
396
397         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
398         if (q == NULL) {
399                 return ENOMEM;
400         }
401         q->ctx = ctx;
402         q->msgs = NULL;
403         memcpy(q->path, dst->sun_path, pathlen);
404
405         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
406         if (q->sock == -1) {
407                 err = errno;
408                 goto fail_free;
409         }
410
411         err = prepare_socket_cloexec(q->sock);
412         if (err != 0) {
413                 goto fail_close;
414         }
415
416         do {
417                 ret = connect(q->sock,
418                               (const struct sockaddr *)(const void *)dst,
419                               sizeof(*dst));
420         } while ((ret == -1) && (errno == EINTR));
421
422         if (ret == -1) {
423                 err = errno;
424                 goto fail_close;
425         }
426
427         err = unix_dgram_init_pthreadpool(ctx);
428         if (err != 0) {
429                 goto fail_close;
430         }
431
432         DLIST_ADD(ctx->send_queues, q);
433
434         *result = q;
435         return 0;
436
437 fail_close:
438         close(q->sock);
439 fail_free:
440         free(q);
441         return err;
442 }
443
444 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
445 {
446         struct unix_dgram_ctx *ctx = q->ctx;
447
448         while (q->msgs != NULL) {
449                 struct unix_dgram_msg *msg;
450                 msg = q->msgs;
451                 DLIST_REMOVE(q->msgs, msg);
452                 close_fd_array_cmsg(&msg->msg);
453                 free(msg);
454         }
455         close(q->sock);
456         DLIST_REMOVE(ctx->send_queues, q);
457         free(q);
458 }
459
460 static struct unix_dgram_send_queue *find_send_queue(
461         struct unix_dgram_ctx *ctx, const char *dst_sock)
462 {
463         struct unix_dgram_send_queue *s;
464
465         for (s = ctx->send_queues; s != NULL; s = s->next) {
466                 if (strcmp(s->path, dst_sock) == 0) {
467                         return s;
468                 }
469         }
470         return NULL;
471 }
472
473 static int queue_msg(struct unix_dgram_send_queue *q,
474                      const struct iovec *iov, int iovlen,
475                      const int *fds, size_t num_fds)
476 {
477         struct unix_dgram_msg *msg;
478         ssize_t data_len;
479         uint8_t *data_buf;
480         size_t msglen;
481         int i;
482         size_t tmp;
483         int ret = -1;
484 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
485         size_t fds_size = sizeof(int) * MIN(num_fds, INT8_MAX);
486         int fds_copy[MIN(num_fds, INT8_MAX)];
487         size_t cmsg_len = CMSG_LEN(fds_size);
488         size_t cmsg_space = CMSG_SPACE(fds_size);
489         char *cmsg_buf;
490 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
491
492         if (num_fds > INT8_MAX) {
493                 return EINVAL;
494         }
495
496 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
497         if (num_fds > 0) {
498                 return ENOSYS;
499         }
500 #endif
501
502         msglen = sizeof(struct unix_dgram_msg);
503
504         /*
505          * Note: No need to check for overflow here,
506          * since cmsg will store <= INT8_MAX fds.
507          */
508         msglen += cmsg_space;
509
510         data_len = iov_buflen(iov, iovlen);
511         if (data_len == -1) {
512                 return EINVAL;
513         }
514
515         tmp = msglen + data_len;
516         if ((tmp < msglen) || (tmp < data_len)) {
517                 /* overflow */
518                 return EINVAL;
519         }
520         msglen = tmp;
521
522 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
523         for (i = 0; i < num_fds; i++) {
524                 fds_copy[i] = -1;
525         }
526
527         for (i = 0; i < num_fds; i++) {
528                 fds_copy[i] = dup(fds[i]);
529                 if (fds_copy[i] == -1) {
530                         ret = errno;
531                         goto fail;
532                 }
533         }
534 #endif
535
536         msg = malloc(msglen);
537         if (msg == NULL) {
538                 ret = ENOMEM;
539                 goto fail;
540         }
541
542         msg->sock = q->sock;
543         msg->num_fds = num_fds;
544
545         data_buf = (uint8_t *)(msg + 1);
546
547 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
548         if (num_fds > 0) {
549                 cmsg_buf = (char *)data_buf;
550                 memset(cmsg_buf, 0, cmsg_space);
551                 data_buf += cmsg_space;
552         } else {
553                 cmsg_buf = NULL;
554                 cmsg_space = 0;
555         }
556 #endif
557
558         msg->iov = (struct iovec) {
559                 .iov_base = (void *)data_buf,
560                 .iov_len = data_len,
561         };
562
563         msg->msg = (struct msghdr) {
564                 .msg_iov = &msg->iov,
565                 .msg_iovlen = 1,
566 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
567                 .msg_control = cmsg_buf,
568                 .msg_controllen = cmsg_space,
569 #endif
570         };
571
572 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
573         if (num_fds > 0) {
574                 struct cmsghdr *cmsg;
575                 void *fdptr;
576
577                 cmsg = CMSG_FIRSTHDR(&msg->msg);
578                 cmsg->cmsg_level = SOL_SOCKET;
579                 cmsg->cmsg_type = SCM_RIGHTS;
580                 cmsg->cmsg_len = cmsg_len;
581                 fdptr = CMSG_DATA(cmsg);
582                 memcpy(fdptr, fds_copy, fds_size);
583                 msg->msg.msg_controllen = cmsg->cmsg_len;
584         }
585 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
586
587         for (i=0; i<iovlen; i++) {
588                 memcpy(data_buf, iov[i].iov_base, iov[i].iov_len);
589                 data_buf += iov[i].iov_len;
590         }
591
592         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
593         return 0;
594
595 fail:
596         close_fd_array(fds_copy, num_fds);
597         return ret;
598 }
599
600 static void unix_dgram_send_job(void *private_data)
601 {
602         struct unix_dgram_msg *dmsg = private_data;
603
604         do {
605                 dmsg->sent = sendmsg(dmsg->sock, &dmsg->msg, 0);
606         } while ((dmsg->sent == -1) && (errno == EINTR));
607
608         if (dmsg->sent == -1) {
609                 dmsg->sys_errno = errno;
610         }
611 }
612
613 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
614                                     void *private_data)
615 {
616         struct unix_dgram_ctx *ctx = private_data;
617         struct unix_dgram_send_queue *q;
618         struct unix_dgram_msg *msg;
619         int ret, job;
620
621         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
622         if (ret != 1) {
623                 return;
624         }
625
626         for (q = ctx->send_queues; q != NULL; q = q->next) {
627                 if (job == q->sock) {
628                         break;
629                 }
630         }
631
632         if (q == NULL) {
633                 /* Huh? Should not happen */
634                 return;
635         }
636
637         msg = q->msgs;
638         DLIST_REMOVE(q->msgs, msg);
639         close_fd_array_cmsg(&msg->msg);
640         free(msg);
641
642         if (q->msgs != NULL) {
643                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
644                                           unix_dgram_send_job, q->msgs);
645                 if (ret == 0) {
646                         return;
647                 }
648         }
649
650         unix_dgram_send_queue_free(q);
651 }
652
653 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
654                            const struct sockaddr_un *dst,
655                            const struct iovec *iov, int iovlen,
656                            const int *fds, size_t num_fds)
657 {
658         struct unix_dgram_send_queue *q;
659         struct msghdr msg;
660 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
661         struct cmsghdr *cmsg;
662         size_t fds_size = sizeof(int) * num_fds;
663         size_t cmsg_len = CMSG_LEN(fds_size);
664         size_t cmsg_space = CMSG_SPACE(fds_size);
665         char cmsg_buf[cmsg_space];
666 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
667         int ret;
668         int i;
669
670         if (num_fds > INT8_MAX) {
671                 return EINVAL;
672         }
673
674 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
675         if (num_fds > 0) {
676                 return ENOSYS;
677         }
678 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
679
680         for (i = 0; i < num_fds; i++) {
681                 /*
682                  * Make sure we only allow fd passing
683                  * for communication channels,
684                  * e.g. sockets, pipes, fifos, ...
685                  */
686                 ret = lseek(fds[i], 0, SEEK_CUR);
687                 if (ret == -1 && errno == ESPIPE) {
688                         /* ok */
689                         continue;
690                 }
691
692                 /*
693                  * Reject the message as we may need to call dup(),
694                  * if we queue the message.
695                  *
696                  * That might result in unexpected behavior for the caller
697                  * for files and broken posix locking.
698                  */
699                 return EINVAL;
700         }
701
702         /*
703          * To preserve message ordering, we have to queue a message when
704          * others are waiting in line already.
705          */
706         q = find_send_queue(ctx, dst->sun_path);
707         if (q != NULL) {
708                 return queue_msg(q, iov, iovlen, fds, num_fds);
709         }
710
711         /*
712          * Try a cheap nonblocking send
713          */
714
715         msg = (struct msghdr) {
716                 .msg_name = discard_const_p(struct sockaddr_un, dst),
717                 .msg_namelen = sizeof(*dst),
718                 .msg_iov = discard_const_p(struct iovec, iov),
719                 .msg_iovlen = iovlen
720         };
721 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
722         if (num_fds > 0) {
723                 void *fdptr;
724
725                 memset(cmsg_buf, 0, cmsg_space);
726
727                 msg.msg_control = cmsg_buf;
728                 msg.msg_controllen = cmsg_space;
729                 cmsg = CMSG_FIRSTHDR(&msg);
730                 cmsg->cmsg_level = SOL_SOCKET;
731                 cmsg->cmsg_type = SCM_RIGHTS;
732                 cmsg->cmsg_len = cmsg_len;
733                 fdptr = CMSG_DATA(cmsg);
734                 memcpy(fdptr, fds, fds_size);
735                 msg.msg_controllen = cmsg->cmsg_len;
736         }
737 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
738
739         ret = sendmsg(ctx->sock, &msg, 0);
740         if (ret >= 0) {
741                 return 0;
742         }
743         if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
744                 return errno;
745         }
746
747         ret = unix_dgram_send_queue_init(ctx, dst, &q);
748         if (ret != 0) {
749                 return ret;
750         }
751         ret = queue_msg(q, iov, iovlen, fds, num_fds);
752         if (ret != 0) {
753                 unix_dgram_send_queue_free(q);
754                 return ret;
755         }
756         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
757                                   unix_dgram_send_job, q->msgs);
758         if (ret != 0) {
759                 unix_dgram_send_queue_free(q);
760                 return ret;
761         }
762         return 0;
763 }
764
765 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
766 {
767         return ctx->sock;
768 }
769
770 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
771 {
772         if (ctx->send_queues != NULL) {
773                 return EBUSY;
774         }
775
776         if (ctx->send_pool != NULL) {
777                 int ret = pthreadpool_destroy(ctx->send_pool);
778                 if (ret != 0) {
779                         return ret;
780                 }
781                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
782         }
783
784         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
785
786         if (getpid() == ctx->created_pid) {
787                 /* If we created it, unlink. Otherwise someone else might
788                  * still have it open */
789                 unlink(ctx->path);
790         }
791
792         close(ctx->sock);
793         free(ctx->recv_buf);
794         free(ctx);
795         return 0;
796 }
797
798 /*
799  * Every message starts with a uint64_t cookie.
800  *
801  * A value of 0 indicates a single-fragment message which is complete in
802  * itself. The data immediately follows the cookie.
803  *
804  * Every multi-fragment message has a cookie != 0 and starts with a cookie
805  * followed by a struct unix_msg_header and then the data. The pid and sock
806  * fields are used to assure uniqueness on the receiver side.
807  */
808
809 struct unix_msg_hdr {
810         size_t msglen;
811         pid_t pid;
812         int sock;
813 };
814
815 struct unix_msg {
816         struct unix_msg *prev, *next;
817         size_t msglen;
818         size_t received;
819         pid_t sender_pid;
820         int sender_sock;
821         uint64_t cookie;
822         uint8_t buf[1];
823 };
824
825 struct unix_msg_ctx {
826         struct unix_dgram_ctx *dgram;
827         size_t fragment_len;
828         uint64_t cookie;
829
830         void (*recv_callback)(struct unix_msg_ctx *ctx,
831                               uint8_t *msg, size_t msg_len,
832                               int *fds, size_t num_fds,
833                               void *private_data);
834         void *private_data;
835
836         struct unix_msg *msgs;
837 };
838
839 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
840                           uint8_t *buf, size_t buflen,
841                           int *fds, size_t num_fds,
842                           void *private_data);
843
844 int unix_msg_init(const struct sockaddr_un *addr,
845                   const struct poll_funcs *ev_funcs,
846                   size_t fragment_len, uint64_t cookie,
847                   void (*recv_callback)(struct unix_msg_ctx *ctx,
848                                         uint8_t *msg, size_t msg_len,
849                                         int *fds, size_t num_fds,
850                                         void *private_data),
851                   void *private_data,
852                   struct unix_msg_ctx **result)
853 {
854         struct unix_msg_ctx *ctx;
855         int ret;
856
857         ctx = malloc(sizeof(*ctx));
858         if (ctx == NULL) {
859                 return ENOMEM;
860         }
861
862         *ctx = (struct unix_msg_ctx) {
863                 .fragment_len = fragment_len,
864                 .cookie = cookie,
865                 .recv_callback = recv_callback,
866                 .private_data = private_data
867         };
868
869         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
870                               unix_msg_recv, ctx, &ctx->dgram);
871         if (ret != 0) {
872                 free(ctx);
873                 return ret;
874         }
875
876         *result = ctx;
877         return 0;
878 }
879
880 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
881                   const struct iovec *iov, int iovlen,
882                   const int *fds, size_t num_fds)
883 {
884         ssize_t msglen;
885         size_t sent;
886         int ret = 0;
887         struct iovec iov_copy[iovlen+2];
888         struct unix_msg_hdr hdr;
889         struct iovec src_iov;
890
891         if (iovlen < 0) {
892                 return EINVAL;
893         }
894
895         msglen = iov_buflen(iov, iovlen);
896         if (msglen == -1) {
897                 return EINVAL;
898         }
899
900 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
901         if (num_fds > 0) {
902                 return ENOSYS;
903         }
904 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
905
906         if (num_fds > INT8_MAX) {
907                 return EINVAL;
908         }
909
910         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
911                 uint64_t cookie = 0;
912
913                 iov_copy[0].iov_base = &cookie;
914                 iov_copy[0].iov_len = sizeof(cookie);
915                 if (iovlen > 0) {
916                         memcpy(&iov_copy[1], iov,
917                                sizeof(struct iovec) * iovlen);
918                 }
919
920                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
921                                        fds, num_fds);
922         }
923
924         hdr = (struct unix_msg_hdr) {
925                 .msglen = msglen,
926                 .pid = getpid(),
927                 .sock = unix_dgram_sock(ctx->dgram)
928         };
929
930         iov_copy[0].iov_base = &ctx->cookie;
931         iov_copy[0].iov_len = sizeof(ctx->cookie);
932         iov_copy[1].iov_base = &hdr;
933         iov_copy[1].iov_len = sizeof(hdr);
934
935         sent = 0;
936         src_iov = iov[0];
937
938         /*
939          * The following write loop sends the user message in pieces. We have
940          * filled the first two iovecs above with "cookie" and "hdr". In the
941          * following loops we pull message chunks from the user iov array and
942          * fill iov_copy piece by piece, possibly truncating chunks from the
943          * caller's iov array. Ugly, but hopefully efficient.
944          */
945
946         while (sent < msglen) {
947                 size_t fragment_len;
948                 size_t iov_index = 2;
949
950                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
951
952                 while (fragment_len < ctx->fragment_len) {
953                         size_t space, chunk;
954
955                         space = ctx->fragment_len - fragment_len;
956                         chunk = MIN(space, src_iov.iov_len);
957
958                         iov_copy[iov_index].iov_base = src_iov.iov_base;
959                         iov_copy[iov_index].iov_len = chunk;
960                         iov_index += 1;
961
962                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
963                         src_iov.iov_len -= chunk;
964                         fragment_len += chunk;
965
966                         if (src_iov.iov_len == 0) {
967                                 iov += 1;
968                                 iovlen -= 1;
969                                 if (iovlen == 0) {
970                                         break;
971                                 }
972                                 src_iov = iov[0];
973                         }
974                 }
975                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
976
977                 /*
978                  * only the last fragment should pass the fd array.
979                  * That simplifies the receiver a lot.
980                  */
981                 if (sent < msglen) {
982                         ret = unix_dgram_send(ctx->dgram, dst,
983                                               iov_copy, iov_index,
984                                               NULL, 0);
985                 } else {
986                         ret = unix_dgram_send(ctx->dgram, dst,
987                                               iov_copy, iov_index,
988                                               fds, num_fds);
989                 }
990                 if (ret != 0) {
991                         break;
992                 }
993         }
994
995         ctx->cookie += 1;
996         if (ctx->cookie == 0) {
997                 ctx->cookie += 1;
998         }
999
1000         return ret;
1001 }
1002
1003 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
1004                           uint8_t *buf, size_t buflen,
1005                           int *fds, size_t num_fds,
1006                           void *private_data)
1007 {
1008         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
1009         struct unix_msg_hdr hdr;
1010         struct unix_msg *msg;
1011         size_t space;
1012         uint64_t cookie;
1013
1014         if (buflen < sizeof(cookie)) {
1015                 goto close_fds;
1016         }
1017
1018         memcpy(&cookie, buf, sizeof(cookie));
1019
1020         buf += sizeof(cookie);
1021         buflen -= sizeof(cookie);
1022
1023         if (cookie == 0) {
1024                 ctx->recv_callback(ctx, buf, buflen, fds, num_fds, ctx->private_data);
1025                 return;
1026         }
1027
1028         if (buflen < sizeof(hdr)) {
1029                 goto close_fds;
1030         }
1031         memcpy(&hdr, buf, sizeof(hdr));
1032
1033         buf += sizeof(hdr);
1034         buflen -= sizeof(hdr);
1035
1036         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
1037                 if ((msg->sender_pid == hdr.pid) &&
1038                     (msg->sender_sock == hdr.sock)) {
1039                         break;
1040                 }
1041         }
1042
1043         if ((msg != NULL) && (msg->cookie != cookie)) {
1044                 DLIST_REMOVE(ctx->msgs, msg);
1045                 free(msg);
1046                 msg = NULL;
1047         }
1048
1049         if (msg == NULL) {
1050                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
1051                 if (msg == NULL) {
1052                         goto close_fds;
1053                 }
1054                 *msg = (struct unix_msg) {
1055                         .msglen = hdr.msglen,
1056                         .sender_pid = hdr.pid,
1057                         .sender_sock = hdr.sock,
1058                         .cookie = cookie
1059                 };
1060                 DLIST_ADD(ctx->msgs, msg);
1061         }
1062
1063         space = msg->msglen - msg->received;
1064         if (buflen > space) {
1065                 goto close_fds;
1066         }
1067
1068         memcpy(msg->buf + msg->received, buf, buflen);
1069         msg->received += buflen;
1070
1071         if (msg->received < msg->msglen) {
1072                 goto close_fds;
1073         }
1074
1075         DLIST_REMOVE(ctx->msgs, msg);
1076         ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds, ctx->private_data);
1077         free(msg);
1078         return;
1079
1080 close_fds:
1081         close_fd_array(fds, num_fds);
1082 }
1083
1084 int unix_msg_free(struct unix_msg_ctx *ctx)
1085 {
1086         int ret;
1087
1088         ret = unix_dgram_free(ctx->dgram);
1089         if (ret != 0) {
1090                 return ret;
1091         }
1092
1093         while (ctx->msgs != NULL) {
1094                 struct unix_msg *msg = ctx->msgs;
1095                 DLIST_REMOVE(ctx->msgs, msg);
1096                 free(msg);
1097         }
1098
1099         free(ctx);
1100         return 0;
1101 }
1102
1103 static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
1104 {
1105         size_t buflen = 0;
1106         int i;
1107
1108         for (i=0; i<iovlen; i++) {
1109                 size_t thislen = iov[i].iov_len;
1110                 size_t tmp = buflen + thislen;
1111
1112                 if ((tmp < buflen) || (tmp < thislen)) {
1113                         /* overflow */
1114                         return -1;
1115                 }
1116                 buflen = tmp;
1117         }
1118         return buflen;
1119 }