unix_msg: introduce send queue caching
[metze/samba/wip.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "lib/util/dlinklist.h"
25 #include "pthreadpool/pthreadpool_pipe.h"
26 #include "lib/util/iov_buf.h"
27 #include "lib/util/msghdr.h"
28 #include <fcntl.h>
29 #include "lib/util/time.h"
30
31 /*
32  * This file implements two abstractions: The "unix_dgram" functions implement
33  * queueing for unix domain datagram sockets. You can send to a destination
34  * socket, and if that has no free space available, it will fall back to an
35  * anonymous socket that will poll for writability. "unix_dgram" expects the
36  * data size not to exceed the system limit.
37  *
38  * The "unix_msg" functions implement the fragmentation of large messages on
39  * top of "unix_dgram". This is what is exposed to the user of this API.
40  */
41
42 struct unix_dgram_msg {
43         struct unix_dgram_msg *prev, *next;
44
45         int sock;
46         ssize_t sent;
47         int sys_errno;
48 };
49
50 struct unix_dgram_send_queue {
51         struct unix_dgram_send_queue *prev, *next;
52         struct unix_dgram_ctx *ctx;
53         int sock;
54         struct unix_dgram_msg *msgs;
55         struct poll_timeout *timeout;
56         char path[];
57 };
58
59 struct unix_dgram_ctx {
60         int sock;
61         pid_t created_pid;
62         const struct poll_funcs *ev_funcs;
63         size_t max_msg;
64
65         void (*recv_callback)(struct unix_dgram_ctx *ctx,
66                               uint8_t *msg, size_t msg_len,
67                               int *fds, size_t num_fds,
68                               void *private_data);
69         void *private_data;
70
71         struct poll_watch *sock_read_watch;
72         struct unix_dgram_send_queue *send_queues;
73
74         struct pthreadpool_pipe *send_pool;
75         struct poll_watch *pool_read_watch;
76
77         uint8_t *recv_buf;
78         char path[];
79 };
80
81 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
82                                     void *private_data);
83
84 /* Set socket non blocking. */
85 static int prepare_socket_nonblock(int sock, bool nonblock)
86 {
87         int flags;
88 #ifdef O_NONBLOCK
89 #define FLAG_TO_SET O_NONBLOCK
90 #else
91 #ifdef SYSV
92 #define FLAG_TO_SET O_NDELAY
93 #else /* BSD */
94 #define FLAG_TO_SET FNDELAY
95 #endif
96 #endif
97
98         flags = fcntl(sock, F_GETFL);
99         if (flags == -1) {
100                 return errno;
101         }
102         if (nonblock) {
103                 flags |= FLAG_TO_SET;
104         } else {
105                 flags &= ~FLAG_TO_SET;
106         }
107         if (fcntl(sock, F_SETFL, flags) == -1) {
108                 return errno;
109         }
110
111 #undef FLAG_TO_SET
112         return 0;
113 }
114
115 /* Set socket close on exec. */
116 static int prepare_socket_cloexec(int sock)
117 {
118 #ifdef FD_CLOEXEC
119         int flags;
120
121         flags = fcntl(sock, F_GETFD, 0);
122         if (flags == -1) {
123                 return errno;
124         }
125         flags |= FD_CLOEXEC;
126         if (fcntl(sock, F_SETFD, flags) == -1) {
127                 return errno;
128         }
129 #endif
130         return 0;
131 }
132
133 /* Set socket non blocking and close on exec. */
134 static int prepare_socket(int sock)
135 {
136         int ret = prepare_socket_nonblock(sock, true);
137
138         if (ret) {
139                 return ret;
140         }
141         return prepare_socket_cloexec(sock);
142 }
143
144 static size_t unix_dgram_msg_size(void)
145 {
146         size_t msgsize = sizeof(struct unix_dgram_msg);
147         msgsize = (msgsize + 15) & ~15; /* align to 16 */
148         return msgsize;
149 }
150
151 static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
152 {
153         /*
154          * Not portable in C99, but "msg" is aligned and so is
155          * unix_dgram_msg_size()
156          */
157         return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
158 }
159
160 static void close_fd_array(int *fds, size_t num_fds)
161 {
162         size_t i;
163
164         for (i = 0; i < num_fds; i++) {
165                 if (fds[i] == -1) {
166                         continue;
167                 }
168
169                 close(fds[i]);
170                 fds[i] = -1;
171         }
172 }
173
174 static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
175 {
176         struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
177         struct msghdr *msg = msghdr_buf_msghdr(hdr);
178         size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
179         int fds[num_fds];
180
181         msghdr_extract_fds(msg, fds, num_fds);
182
183         close_fd_array(fds, num_fds);
184 }
185
186 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
187                            const struct poll_funcs *ev_funcs,
188                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
189                                                  uint8_t *msg, size_t msg_len,
190                                                  int *fds, size_t num_fds,
191                                                  void *private_data),
192                            void *private_data,
193                            struct unix_dgram_ctx **result)
194 {
195         struct unix_dgram_ctx *ctx;
196         size_t pathlen;
197         int ret;
198
199         if (addr != NULL) {
200                 pathlen = strlen(addr->sun_path)+1;
201         } else {
202                 pathlen = 1;
203         }
204
205         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
206         if (ctx == NULL) {
207                 return ENOMEM;
208         }
209         if (addr != NULL) {
210                 memcpy(ctx->path, addr->sun_path, pathlen);
211         } else {
212                 ctx->path[0] = '\0';
213         }
214
215         *ctx = (struct unix_dgram_ctx) {
216                 .max_msg = max_msg,
217                 .ev_funcs = ev_funcs,
218                 .recv_callback = recv_callback,
219                 .private_data = private_data,
220                 .created_pid = (pid_t)-1
221         };
222
223         ctx->recv_buf = malloc(max_msg);
224         if (ctx->recv_buf == NULL) {
225                 free(ctx);
226                 return ENOMEM;
227         }
228
229         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
230         if (ctx->sock == -1) {
231                 ret = errno;
232                 goto fail_free;
233         }
234
235         /* Set non-blocking and close-on-exec. */
236         ret = prepare_socket(ctx->sock);
237         if (ret != 0) {
238                 goto fail_close;
239         }
240
241         if (addr != NULL) {
242                 ret = bind(ctx->sock,
243                            (const struct sockaddr *)(const void *)addr,
244                            sizeof(*addr));
245                 if (ret == -1) {
246                         ret = errno;
247                         goto fail_close;
248                 }
249
250                 ctx->created_pid = getpid();
251
252                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
253                         ctx->ev_funcs, ctx->sock, POLLIN,
254                         unix_dgram_recv_handler, ctx);
255
256                 if (ctx->sock_read_watch == NULL) {
257                         ret = ENOMEM;
258                         goto fail_close;
259                 }
260         }
261
262         *result = ctx;
263         return 0;
264
265 fail_close:
266         close(ctx->sock);
267 fail_free:
268         free(ctx->recv_buf);
269         free(ctx);
270         return ret;
271 }
272
273 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
274                                     void *private_data)
275 {
276         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
277         ssize_t received;
278         int flags = 0;
279         struct msghdr msg;
280         struct iovec iov;
281         size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
282         uint8_t buf[bufsize];
283
284         iov = (struct iovec) {
285                 .iov_base = (void *)ctx->recv_buf,
286                 .iov_len = ctx->max_msg,
287         };
288
289         msg = (struct msghdr) {
290                 .msg_iov = &iov,
291                 .msg_iovlen = 1,
292         };
293
294         msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
295
296 #ifdef MSG_CMSG_CLOEXEC
297         flags |= MSG_CMSG_CLOEXEC;
298 #endif
299
300         received = recvmsg(fd, &msg, flags);
301         if (received == -1) {
302                 if ((errno == EAGAIN) ||
303                     (errno == EWOULDBLOCK) ||
304                     (errno == EINTR) || (errno == ENOMEM)) {
305                         /* Not really an error - just try again. */
306                         return;
307                 }
308                 /* Problem with the socket. Set it unreadable. */
309                 ctx->ev_funcs->watch_update(w, 0);
310                 return;
311         }
312         if (received > ctx->max_msg) {
313                 /* More than we expected, not for us */
314                 return;
315         }
316
317         {
318                 size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
319                 int fds[num_fds];
320                 int i;
321
322                 msghdr_extract_fds(&msg, fds, num_fds);
323
324                 for (i = 0; i < num_fds; i++) {
325                         int err;
326
327                         err = prepare_socket_cloexec(fds[i]);
328                         if (err != 0) {
329                                 close_fd_array(fds, num_fds);
330                                 num_fds = 0;
331                         }
332                 }
333
334                 ctx->recv_callback(ctx, ctx->recv_buf, received,
335                                    fds, num_fds, ctx->private_data);
336         }
337 }
338
339 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
340                                     void *private_data);
341
342 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
343 {
344         int ret, signalfd;
345
346         if (ctx->send_pool != NULL) {
347                 return 0;
348         }
349
350         ret = pthreadpool_pipe_init(0, &ctx->send_pool);
351         if (ret != 0) {
352                 return ret;
353         }
354
355         signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
356
357         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
358                 ctx->ev_funcs, signalfd, POLLIN,
359                 unix_dgram_job_finished, ctx);
360         if (ctx->pool_read_watch == NULL) {
361                 pthreadpool_pipe_destroy(ctx->send_pool);
362                 ctx->send_pool = NULL;
363                 return ENOMEM;
364         }
365
366         return 0;
367 }
368
369 static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
370
371 static int unix_dgram_send_queue_init(
372         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
373         struct unix_dgram_send_queue **result)
374 {
375         struct unix_dgram_send_queue *q;
376         size_t pathlen;
377         int ret, err;
378
379         pathlen = strlen(dst->sun_path)+1;
380
381         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
382         if (q == NULL) {
383                 return ENOMEM;
384         }
385         q->ctx = ctx;
386         q->msgs = NULL;
387         q->timeout = NULL;
388         memcpy(q->path, dst->sun_path, pathlen);
389
390         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
391         if (q->sock == -1) {
392                 err = errno;
393                 goto fail_free;
394         }
395
396         err = prepare_socket_cloexec(q->sock);
397         if (err != 0) {
398                 goto fail_close;
399         }
400
401         do {
402                 ret = connect(q->sock,
403                               (const struct sockaddr *)(const void *)dst,
404                               sizeof(*dst));
405         } while ((ret == -1) && (errno == EINTR));
406
407         if (ret == -1) {
408                 err = errno;
409                 goto fail_close;
410         }
411
412         err = unix_dgram_init_pthreadpool(ctx);
413         if (err != 0) {
414                 goto fail_close;
415         }
416
417         DLIST_ADD(ctx->send_queues, q);
418
419         ret = unix_dgram_sendq_schedule_free(q);
420         if (ret != 0) {
421                 err = ENOMEM;
422                 goto fail_close;
423         }
424
425         *result = q;
426         return 0;
427
428 fail_close:
429         close(q->sock);
430 fail_free:
431         free(q);
432         return err;
433 }
434
435 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
436 {
437         struct unix_dgram_ctx *ctx = q->ctx;
438
439         while (q->msgs != NULL) {
440                 struct unix_dgram_msg *msg;
441                 msg = q->msgs;
442                 DLIST_REMOVE(q->msgs, msg);
443                 close_fd_array_dgram_msg(msg);
444                 free(msg);
445         }
446         close(q->sock);
447         DLIST_REMOVE(ctx->send_queues, q);
448         ctx->ev_funcs->timeout_free(q->timeout);
449         free(q);
450 }
451
452 static void unix_dgram_sendq_scheduled_free_handler(
453         struct poll_timeout *t, void *private_data);
454
455 static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
456 {
457         struct unix_dgram_ctx *ctx = q->ctx;
458         struct timeval timeout;
459
460         if (q->timeout != NULL) {
461                 return 0;
462         }
463
464         GetTimeOfDay(&timeout);
465         timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
466
467         q->timeout = ctx->ev_funcs->timeout_new(
468                 ctx->ev_funcs,
469                 timeout,
470                 unix_dgram_sendq_scheduled_free_handler,
471                 q);
472         if (q->timeout == NULL) {
473                 unix_dgram_send_queue_free(q);
474                 return ENOMEM;
475         }
476
477         return 0;
478 }
479
480 static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
481                                                     void *private_data)
482 {
483         struct unix_dgram_send_queue *q = private_data;
484         int ret;
485
486         q->ctx->ev_funcs->timeout_free(q->timeout);
487         q->timeout = NULL;
488
489         if (q->msgs == NULL) {
490                 unix_dgram_send_queue_free(q);
491                 return;
492         }
493
494         ret = unix_dgram_sendq_schedule_free(q);
495         if (ret != 0) {
496                 unix_dgram_send_queue_free(q);
497                 return;
498         }
499 }
500
501 static int find_send_queue(struct unix_dgram_ctx *ctx,
502                            const struct sockaddr_un *dst,
503                            struct unix_dgram_send_queue **ps)
504 {
505         struct unix_dgram_send_queue *s;
506
507         for (s = ctx->send_queues; s != NULL; s = s->next) {
508                 if (strcmp(s->path, dst->sun_path) == 0) {
509                         *ps = s;
510                         return 0;
511                 }
512         }
513         return ENOENT;
514 }
515
516 static int queue_msg(struct unix_dgram_send_queue *q,
517                      const struct iovec *iov, int iovcnt,
518                      const int *fds, size_t num_fds)
519 {
520         struct unix_dgram_msg *msg;
521         struct msghdr_buf *hdr;
522         size_t msglen, needed;
523         ssize_t msghdrlen;
524         int fds_copy[MIN(num_fds, INT8_MAX)];
525         int i, ret;
526
527         for (i=0; i<num_fds; i++) {
528                 fds_copy[i] = -1;
529         }
530
531         for (i = 0; i < num_fds; i++) {
532                 fds_copy[i] = dup(fds[i]);
533                 if (fds_copy[i] == -1) {
534                         ret = errno;
535                         goto fail;
536                 }
537         }
538
539         msglen = unix_dgram_msg_size();
540
541         msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
542                                 fds_copy, num_fds);
543         if (msghdrlen == -1) {
544                 ret = EMSGSIZE;
545                 goto fail;
546         }
547
548         needed = msglen + msghdrlen;
549         if (needed < msglen) {
550                 ret = EMSGSIZE;
551                 goto fail;
552         }
553
554         msg = malloc(needed);
555         if (msg == NULL) {
556                 ret = ENOMEM;
557                 goto fail;
558         }
559         hdr = unix_dgram_msghdr(msg);
560
561         msg->sock = q->sock;
562         msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
563                     fds_copy, num_fds);
564
565         DLIST_ADD_END(q->msgs, msg);
566         return 0;
567 fail:
568         close_fd_array(fds_copy, num_fds);
569         return ret;
570 }
571
572 static void unix_dgram_send_job(void *private_data)
573 {
574         struct unix_dgram_msg *dmsg = private_data;
575
576         do {
577                 struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
578                 struct msghdr *msg = msghdr_buf_msghdr(hdr);
579                 dmsg->sent = sendmsg(dmsg->sock, msg, 0);
580         } while ((dmsg->sent == -1) && (errno == EINTR));
581
582         if (dmsg->sent == -1) {
583                 dmsg->sys_errno = errno;
584         }
585 }
586
587 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
588                                     void *private_data)
589 {
590         struct unix_dgram_ctx *ctx = private_data;
591         struct unix_dgram_send_queue *q;
592         struct unix_dgram_msg *msg;
593         int ret, job;
594
595         ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
596         if (ret != 1) {
597                 return;
598         }
599
600         for (q = ctx->send_queues; q != NULL; q = q->next) {
601                 if (job == q->sock) {
602                         break;
603                 }
604         }
605
606         if (q == NULL) {
607                 /* Huh? Should not happen */
608                 return;
609         }
610
611         msg = q->msgs;
612         DLIST_REMOVE(q->msgs, msg);
613         close_fd_array_dgram_msg(msg);
614         free(msg);
615
616         if (q->msgs != NULL) {
617                 ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
618                                                unix_dgram_send_job, q->msgs);
619                 if (ret != 0) {
620                         unix_dgram_send_queue_free(q);
621                         return;
622                 }
623                 return;
624         }
625 }
626
627 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
628                            const struct sockaddr_un *dst,
629                            const struct iovec *iov, int iovlen,
630                            const int *fds, size_t num_fds)
631 {
632         struct unix_dgram_send_queue *q;
633         struct msghdr msg;
634         ssize_t fdlen;
635         int ret;
636         int i;
637
638         if (num_fds > INT8_MAX) {
639                 return EINVAL;
640         }
641
642 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
643         if (num_fds > 0) {
644                 return ENOSYS;
645         }
646 #endif
647
648         for (i = 0; i < num_fds; i++) {
649                 /*
650                  * Make sure we only allow fd passing
651                  * for communication channels,
652                  * e.g. sockets, pipes, fifos, ...
653                  */
654                 ret = lseek(fds[i], 0, SEEK_CUR);
655                 if (ret == -1 && errno == ESPIPE) {
656                         /* ok */
657                         continue;
658                 }
659
660                 /*
661                  * Reject the message as we may need to call dup(),
662                  * if we queue the message.
663                  *
664                  * That might result in unexpected behavior for the caller
665                  * for files and broken posix locking.
666                  */
667                 return EINVAL;
668         }
669
670         /*
671          * To preserve message ordering, we have to queue a message when
672          * others are waiting in line already.
673          */
674         ret = find_send_queue(ctx, dst, &q);
675         if (ret == 0) {
676                 return queue_msg(q, iov, iovlen, fds, num_fds);
677         }
678
679         /*
680          * Try a cheap nonblocking send
681          */
682
683         msg = (struct msghdr) {
684                 .msg_name = discard_const_p(struct sockaddr_un, dst),
685                 .msg_namelen = sizeof(*dst),
686                 .msg_iov = discard_const_p(struct iovec, iov),
687                 .msg_iovlen = iovlen
688         };
689
690         fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
691         if (fdlen == -1) {
692                 return EINVAL;
693         }
694
695         {
696                 uint8_t buf[fdlen];
697                 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
698
699                 ret = sendmsg(ctx->sock, &msg, 0);
700         }
701
702         if (ret >= 0) {
703                 return 0;
704         }
705         if ((errno != EWOULDBLOCK) &&
706             (errno != EAGAIN) &&
707 #ifdef ENOBUFS
708             /* FreeBSD can give this for large messages */
709             (errno != ENOBUFS) &&
710 #endif
711             (errno != EINTR)) {
712                 return errno;
713         }
714
715         ret = unix_dgram_send_queue_init(ctx, dst, &q);
716         if (ret != 0) {
717                 return ret;
718         }
719         ret = queue_msg(q, iov, iovlen, fds, num_fds);
720         if (ret != 0) {
721                 unix_dgram_send_queue_free(q);
722                 return ret;
723         }
724         ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
725                                        unix_dgram_send_job, q->msgs);
726         if (ret != 0) {
727                 unix_dgram_send_queue_free(q);
728                 return ret;
729         }
730         return 0;
731 }
732
733 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
734 {
735         return ctx->sock;
736 }
737
738 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
739 {
740         if (ctx->send_queues != NULL) {
741                 return EBUSY;
742         }
743
744         if (ctx->send_pool != NULL) {
745                 int ret = pthreadpool_pipe_destroy(ctx->send_pool);
746                 if (ret != 0) {
747                         return ret;
748                 }
749                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
750         }
751
752         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
753
754         close(ctx->sock);
755         if (getpid() == ctx->created_pid) {
756                 /* If we created it, unlink. Otherwise someone else might
757                  * still have it open */
758                 unlink(ctx->path);
759         }
760
761         free(ctx->recv_buf);
762         free(ctx);
763         return 0;
764 }
765
766 /*
767  * Every message starts with a uint64_t cookie.
768  *
769  * A value of 0 indicates a single-fragment message which is complete in
770  * itself. The data immediately follows the cookie.
771  *
772  * Every multi-fragment message has a cookie != 0 and starts with a cookie
773  * followed by a struct unix_msg_header and then the data. The pid and sock
774  * fields are used to assure uniqueness on the receiver side.
775  */
776
777 struct unix_msg_hdr {
778         size_t msglen;
779         pid_t pid;
780         int sock;
781 };
782
783 struct unix_msg {
784         struct unix_msg *prev, *next;
785         size_t msglen;
786         size_t received;
787         pid_t sender_pid;
788         int sender_sock;
789         uint64_t cookie;
790         uint8_t buf[1];
791 };
792
793 struct unix_msg_ctx {
794         struct unix_dgram_ctx *dgram;
795         size_t fragment_len;
796         uint64_t cookie;
797
798         void (*recv_callback)(struct unix_msg_ctx *ctx,
799                               uint8_t *msg, size_t msg_len,
800                               int *fds, size_t num_fds,
801                               void *private_data);
802         void *private_data;
803
804         struct unix_msg *msgs;
805 };
806
807 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
808                           uint8_t *buf, size_t buflen,
809                           int *fds, size_t num_fds,
810                           void *private_data);
811
812 int unix_msg_init(const struct sockaddr_un *addr,
813                   const struct poll_funcs *ev_funcs,
814                   size_t fragment_len,
815                   void (*recv_callback)(struct unix_msg_ctx *ctx,
816                                         uint8_t *msg, size_t msg_len,
817                                         int *fds, size_t num_fds,
818                                         void *private_data),
819                   void *private_data,
820                   struct unix_msg_ctx **result)
821 {
822         struct unix_msg_ctx *ctx;
823         int ret;
824
825         ctx = malloc(sizeof(*ctx));
826         if (ctx == NULL) {
827                 return ENOMEM;
828         }
829
830         *ctx = (struct unix_msg_ctx) {
831                 .fragment_len = fragment_len,
832                 .cookie = 1,
833                 .recv_callback = recv_callback,
834                 .private_data = private_data
835         };
836
837         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
838                               unix_msg_recv, ctx, &ctx->dgram);
839         if (ret != 0) {
840                 free(ctx);
841                 return ret;
842         }
843
844         *result = ctx;
845         return 0;
846 }
847
848 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
849                   const struct iovec *iov, int iovlen,
850                   const int *fds, size_t num_fds)
851 {
852         ssize_t msglen;
853         size_t sent;
854         int ret = 0;
855         struct iovec iov_copy[iovlen+2];
856         struct unix_msg_hdr hdr;
857         struct iovec src_iov;
858
859         if (iovlen < 0) {
860                 return EINVAL;
861         }
862
863         msglen = iov_buflen(iov, iovlen);
864         if (msglen == -1) {
865                 return EINVAL;
866         }
867
868         if (num_fds > INT8_MAX) {
869                 return EINVAL;
870         }
871
872         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
873                 uint64_t cookie = 0;
874
875                 iov_copy[0].iov_base = &cookie;
876                 iov_copy[0].iov_len = sizeof(cookie);
877                 if (iovlen > 0) {
878                         memcpy(&iov_copy[1], iov,
879                                sizeof(struct iovec) * iovlen);
880                 }
881
882                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
883                                        fds, num_fds);
884         }
885
886         hdr = (struct unix_msg_hdr) {
887                 .msglen = msglen,
888                 .pid = getpid(),
889                 .sock = unix_dgram_sock(ctx->dgram)
890         };
891
892         iov_copy[0].iov_base = &ctx->cookie;
893         iov_copy[0].iov_len = sizeof(ctx->cookie);
894         iov_copy[1].iov_base = &hdr;
895         iov_copy[1].iov_len = sizeof(hdr);
896
897         sent = 0;
898         src_iov = iov[0];
899
900         /*
901          * The following write loop sends the user message in pieces. We have
902          * filled the first two iovecs above with "cookie" and "hdr". In the
903          * following loops we pull message chunks from the user iov array and
904          * fill iov_copy piece by piece, possibly truncating chunks from the
905          * caller's iov array. Ugly, but hopefully efficient.
906          */
907
908         while (sent < msglen) {
909                 size_t fragment_len;
910                 size_t iov_index = 2;
911
912                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
913
914                 while (fragment_len < ctx->fragment_len) {
915                         size_t space, chunk;
916
917                         space = ctx->fragment_len - fragment_len;
918                         chunk = MIN(space, src_iov.iov_len);
919
920                         iov_copy[iov_index].iov_base = src_iov.iov_base;
921                         iov_copy[iov_index].iov_len = chunk;
922                         iov_index += 1;
923
924                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
925                         src_iov.iov_len -= chunk;
926                         fragment_len += chunk;
927
928                         if (src_iov.iov_len == 0) {
929                                 iov += 1;
930                                 iovlen -= 1;
931                                 if (iovlen == 0) {
932                                         break;
933                                 }
934                                 src_iov = iov[0];
935                         }
936                 }
937                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
938
939                 /*
940                  * only the last fragment should pass the fd array.
941                  * That simplifies the receiver a lot.
942                  */
943                 if (sent < msglen) {
944                         ret = unix_dgram_send(ctx->dgram, dst,
945                                               iov_copy, iov_index,
946                                               NULL, 0);
947                 } else {
948                         ret = unix_dgram_send(ctx->dgram, dst,
949                                               iov_copy, iov_index,
950                                               fds, num_fds);
951                 }
952                 if (ret != 0) {
953                         break;
954                 }
955         }
956
957         ctx->cookie += 1;
958         if (ctx->cookie == 0) {
959                 ctx->cookie += 1;
960         }
961
962         return ret;
963 }
964
965 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
966                           uint8_t *buf, size_t buflen,
967                           int *fds, size_t num_fds,
968                           void *private_data)
969 {
970         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
971         struct unix_msg_hdr hdr;
972         struct unix_msg *msg;
973         size_t space;
974         uint64_t cookie;
975
976         if (buflen < sizeof(cookie)) {
977                 goto close_fds;
978         }
979
980         memcpy(&cookie, buf, sizeof(cookie));
981
982         buf += sizeof(cookie);
983         buflen -= sizeof(cookie);
984
985         if (cookie == 0) {
986                 ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
987                                    ctx->private_data);
988                 return;
989         }
990
991         if (buflen < sizeof(hdr)) {
992                 goto close_fds;
993         }
994         memcpy(&hdr, buf, sizeof(hdr));
995
996         buf += sizeof(hdr);
997         buflen -= sizeof(hdr);
998
999         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
1000                 if ((msg->sender_pid == hdr.pid) &&
1001                     (msg->sender_sock == hdr.sock)) {
1002                         break;
1003                 }
1004         }
1005
1006         if ((msg != NULL) && (msg->cookie != cookie)) {
1007                 DLIST_REMOVE(ctx->msgs, msg);
1008                 free(msg);
1009                 msg = NULL;
1010         }
1011
1012         if (msg == NULL) {
1013                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
1014                 if (msg == NULL) {
1015                         goto close_fds;
1016                 }
1017                 *msg = (struct unix_msg) {
1018                         .msglen = hdr.msglen,
1019                         .sender_pid = hdr.pid,
1020                         .sender_sock = hdr.sock,
1021                         .cookie = cookie
1022                 };
1023                 DLIST_ADD(ctx->msgs, msg);
1024         }
1025
1026         space = msg->msglen - msg->received;
1027         if (buflen > space) {
1028                 goto close_fds;
1029         }
1030
1031         memcpy(msg->buf + msg->received, buf, buflen);
1032         msg->received += buflen;
1033
1034         if (msg->received < msg->msglen) {
1035                 goto close_fds;
1036         }
1037
1038         DLIST_REMOVE(ctx->msgs, msg);
1039         ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
1040                            ctx->private_data);
1041         free(msg);
1042         return;
1043
1044 close_fds:
1045         close_fd_array(fds, num_fds);
1046 }
1047
1048 int unix_msg_free(struct unix_msg_ctx *ctx)
1049 {
1050         int ret;
1051
1052         ret = unix_dgram_free(ctx->dgram);
1053         if (ret != 0) {
1054                 return ret;
1055         }
1056
1057         while (ctx->msgs != NULL) {
1058                 struct unix_msg *msg = ctx->msgs;
1059                 DLIST_REMOVE(ctx->msgs, msg);
1060                 free(msg);
1061         }
1062
1063         free(ctx);
1064         return 0;
1065 }