Use full path to dlinklist.h in includes.
[obnox/samba/samba-obnox.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "lib/util/dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include "lib/util/iov_buf.h"
27 #include "lib/msghdr.h"
28 #include <fcntl.h>
29
30 /*
31  * This file implements two abstractions: The "unix_dgram" functions implement
32  * queueing for unix domain datagram sockets. You can send to a destination
33  * socket, and if that has no free space available, it will fall back to an
34  * anonymous socket that will poll for writability. "unix_dgram" expects the
35  * data size not to exceed the system limit.
36  *
37  * The "unix_msg" functions implement the fragmentation of large messages on
38  * top of "unix_dgram". This is what is exposed to the user of this API.
39  */
40
41 struct unix_dgram_msg {
42         struct unix_dgram_msg *prev, *next;
43
44         int sock;
45         ssize_t sent;
46         int sys_errno;
47 };
48
49 struct unix_dgram_send_queue {
50         struct unix_dgram_send_queue *prev, *next;
51         struct unix_dgram_ctx *ctx;
52         int sock;
53         struct unix_dgram_msg *msgs;
54         char path[];
55 };
56
57 struct unix_dgram_ctx {
58         int sock;
59         pid_t created_pid;
60         const struct poll_funcs *ev_funcs;
61         size_t max_msg;
62
63         void (*recv_callback)(struct unix_dgram_ctx *ctx,
64                               uint8_t *msg, size_t msg_len,
65                               int *fds, size_t num_fds,
66                               void *private_data);
67         void *private_data;
68
69         struct poll_watch *sock_read_watch;
70         struct unix_dgram_send_queue *send_queues;
71
72         struct pthreadpool *send_pool;
73         struct poll_watch *pool_read_watch;
74
75         uint8_t *recv_buf;
76         char path[];
77 };
78
79 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
80                                     void *private_data);
81
82 /* Set socket non blocking. */
83 static int prepare_socket_nonblock(int sock)
84 {
85         int flags;
86 #ifdef O_NONBLOCK
87 #define FLAG_TO_SET O_NONBLOCK
88 #else
89 #ifdef SYSV
90 #define FLAG_TO_SET O_NDELAY
91 #else /* BSD */
92 #define FLAG_TO_SET FNDELAY
93 #endif
94 #endif
95
96         flags = fcntl(sock, F_GETFL);
97         if (flags == -1) {
98                 return errno;
99         }
100         flags |= FLAG_TO_SET;
101         if (fcntl(sock, F_SETFL, flags) == -1) {
102                 return errno;
103         }
104
105 #undef FLAG_TO_SET
106         return 0;
107 }
108
109 /* Set socket close on exec. */
110 static int prepare_socket_cloexec(int sock)
111 {
112 #ifdef FD_CLOEXEC
113         int flags;
114
115         flags = fcntl(sock, F_GETFD, 0);
116         if (flags == -1) {
117                 return errno;
118         }
119         flags |= FD_CLOEXEC;
120         if (fcntl(sock, F_SETFD, flags) == -1) {
121                 return errno;
122         }
123 #endif
124         return 0;
125 }
126
127 /* Set socket non blocking and close on exec. */
128 static int prepare_socket(int sock)
129 {
130         int ret = prepare_socket_nonblock(sock);
131
132         if (ret) {
133                 return ret;
134         }
135         return prepare_socket_cloexec(sock);
136 }
137
138 static size_t unix_dgram_msg_size(void)
139 {
140         size_t msgsize = sizeof(struct unix_dgram_msg);
141         msgsize = (msgsize + 15) & ~15; /* align to 16 */
142         return msgsize;
143 }
144
145 static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
146 {
147         /*
148          * Not portable in C99, but "msg" is aligned and so is
149          * unix_dgram_msg_size()
150          */
151         return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
152 }
153
154 static void close_fd_array(int *fds, size_t num_fds)
155 {
156         size_t i;
157
158         for (i = 0; i < num_fds; i++) {
159                 if (fds[i] == -1) {
160                         continue;
161                 }
162
163                 close(fds[i]);
164                 fds[i] = -1;
165         }
166 }
167
168 static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
169 {
170         struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
171         struct msghdr *msg = msghdr_buf_msghdr(hdr);
172         size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
173         int fds[num_fds];
174
175         msghdr_extract_fds(msg, fds, num_fds);
176
177         close_fd_array(fds, num_fds);
178 }
179
180 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
181                            const struct poll_funcs *ev_funcs,
182                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
183                                                  uint8_t *msg, size_t msg_len,
184                                                  int *fds, size_t num_fds,
185                                                  void *private_data),
186                            void *private_data,
187                            struct unix_dgram_ctx **result)
188 {
189         struct unix_dgram_ctx *ctx;
190         size_t pathlen;
191         int ret;
192
193         if (addr != NULL) {
194                 pathlen = strlen(addr->sun_path)+1;
195         } else {
196                 pathlen = 1;
197         }
198
199         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
200         if (ctx == NULL) {
201                 return ENOMEM;
202         }
203         if (addr != NULL) {
204                 memcpy(ctx->path, addr->sun_path, pathlen);
205         } else {
206                 ctx->path[0] = '\0';
207         }
208
209         *ctx = (struct unix_dgram_ctx) {
210                 .max_msg = max_msg,
211                 .ev_funcs = ev_funcs,
212                 .recv_callback = recv_callback,
213                 .private_data = private_data,
214                 .created_pid = (pid_t)-1
215         };
216
217         ctx->recv_buf = malloc(max_msg);
218         if (ctx->recv_buf == NULL) {
219                 free(ctx);
220                 return ENOMEM;
221         }
222
223         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
224         if (ctx->sock == -1) {
225                 ret = errno;
226                 goto fail_free;
227         }
228
229         /* Set non-blocking and close-on-exec. */
230         ret = prepare_socket(ctx->sock);
231         if (ret != 0) {
232                 goto fail_close;
233         }
234
235         if (addr != NULL) {
236                 ret = bind(ctx->sock,
237                            (const struct sockaddr *)(const void *)addr,
238                            sizeof(*addr));
239                 if (ret == -1) {
240                         ret = errno;
241                         goto fail_close;
242                 }
243
244                 ctx->created_pid = getpid();
245
246                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
247                         ctx->ev_funcs, ctx->sock, POLLIN,
248                         unix_dgram_recv_handler, ctx);
249
250                 if (ctx->sock_read_watch == NULL) {
251                         ret = ENOMEM;
252                         goto fail_close;
253                 }
254         }
255
256         *result = ctx;
257         return 0;
258
259 fail_close:
260         close(ctx->sock);
261 fail_free:
262         free(ctx->recv_buf);
263         free(ctx);
264         return ret;
265 }
266
267 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
268                                     void *private_data)
269 {
270         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
271         ssize_t received;
272         int flags = 0;
273         struct msghdr msg;
274         struct iovec iov;
275         size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
276         uint8_t buf[bufsize];
277
278         iov = (struct iovec) {
279                 .iov_base = (void *)ctx->recv_buf,
280                 .iov_len = ctx->max_msg,
281         };
282
283         msg = (struct msghdr) {
284                 .msg_iov = &iov,
285                 .msg_iovlen = 1,
286         };
287
288         msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
289
290 #ifdef MSG_CMSG_CLOEXEC
291         flags |= MSG_CMSG_CLOEXEC;
292 #endif
293
294         received = recvmsg(fd, &msg, flags);
295         if (received == -1) {
296                 if ((errno == EAGAIN) ||
297                     (errno == EWOULDBLOCK) ||
298                     (errno == EINTR) || (errno == ENOMEM)) {
299                         /* Not really an error - just try again. */
300                         return;
301                 }
302                 /* Problem with the socket. Set it unreadable. */
303                 ctx->ev_funcs->watch_update(w, 0);
304                 return;
305         }
306         if (received > ctx->max_msg) {
307                 /* More than we expected, not for us */
308                 return;
309         }
310
311         {
312                 size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
313                 int fds[num_fds];
314                 int i;
315
316                 msghdr_extract_fds(&msg, fds, num_fds);
317
318                 for (i = 0; i < num_fds; i++) {
319                         int err;
320
321                         err = prepare_socket_cloexec(fds[i]);
322                         if (err != 0) {
323                                 close_fd_array(fds, num_fds);
324                                 num_fds = 0;
325                         }
326                 }
327
328                 ctx->recv_callback(ctx, ctx->recv_buf, received,
329                                    fds, num_fds, ctx->private_data);
330         }
331 }
332
333 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
334                                     void *private_data);
335
336 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
337 {
338         int ret, signalfd;
339
340         if (ctx->send_pool != NULL) {
341                 return 0;
342         }
343
344         ret = pthreadpool_init(0, &ctx->send_pool);
345         if (ret != 0) {
346                 return ret;
347         }
348
349         signalfd = pthreadpool_signal_fd(ctx->send_pool);
350
351         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
352                 ctx->ev_funcs, signalfd, POLLIN,
353                 unix_dgram_job_finished, ctx);
354         if (ctx->pool_read_watch == NULL) {
355                 pthreadpool_destroy(ctx->send_pool);
356                 ctx->send_pool = NULL;
357                 return ENOMEM;
358         }
359
360         return 0;
361 }
362
363 static int unix_dgram_send_queue_init(
364         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
365         struct unix_dgram_send_queue **result)
366 {
367         struct unix_dgram_send_queue *q;
368         size_t pathlen;
369         int ret, err;
370
371         pathlen = strlen(dst->sun_path)+1;
372
373         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
374         if (q == NULL) {
375                 return ENOMEM;
376         }
377         q->ctx = ctx;
378         q->msgs = NULL;
379         memcpy(q->path, dst->sun_path, pathlen);
380
381         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
382         if (q->sock == -1) {
383                 err = errno;
384                 goto fail_free;
385         }
386
387         err = prepare_socket_cloexec(q->sock);
388         if (err != 0) {
389                 goto fail_close;
390         }
391
392         do {
393                 ret = connect(q->sock,
394                               (const struct sockaddr *)(const void *)dst,
395                               sizeof(*dst));
396         } while ((ret == -1) && (errno == EINTR));
397
398         if (ret == -1) {
399                 err = errno;
400                 goto fail_close;
401         }
402
403         err = unix_dgram_init_pthreadpool(ctx);
404         if (err != 0) {
405                 goto fail_close;
406         }
407
408         DLIST_ADD(ctx->send_queues, q);
409
410         *result = q;
411         return 0;
412
413 fail_close:
414         close(q->sock);
415 fail_free:
416         free(q);
417         return err;
418 }
419
420 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
421 {
422         struct unix_dgram_ctx *ctx = q->ctx;
423
424         while (q->msgs != NULL) {
425                 struct unix_dgram_msg *msg;
426                 msg = q->msgs;
427                 DLIST_REMOVE(q->msgs, msg);
428                 close_fd_array_dgram_msg(msg);
429                 free(msg);
430         }
431         close(q->sock);
432         DLIST_REMOVE(ctx->send_queues, q);
433         free(q);
434 }
435
436 static struct unix_dgram_send_queue *find_send_queue(
437         struct unix_dgram_ctx *ctx, const char *dst_sock)
438 {
439         struct unix_dgram_send_queue *s;
440
441         for (s = ctx->send_queues; s != NULL; s = s->next) {
442                 if (strcmp(s->path, dst_sock) == 0) {
443                         return s;
444                 }
445         }
446         return NULL;
447 }
448
449 static int queue_msg(struct unix_dgram_send_queue *q,
450                      const struct iovec *iov, int iovcnt,
451                      const int *fds, size_t num_fds)
452 {
453         struct unix_dgram_msg *msg;
454         struct msghdr_buf *hdr;
455         size_t msglen, needed;
456         ssize_t msghdrlen;
457         int fds_copy[MIN(num_fds, INT8_MAX)];
458         int i, ret;
459
460         for (i=0; i<num_fds; i++) {
461                 fds_copy[i] = -1;
462         }
463
464         for (i = 0; i < num_fds; i++) {
465                 fds_copy[i] = dup(fds[i]);
466                 if (fds_copy[i] == -1) {
467                         ret = errno;
468                         goto fail;
469                 }
470         }
471
472         msglen = unix_dgram_msg_size();
473
474         msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
475                                 fds_copy, num_fds);
476         if (msghdrlen == -1) {
477                 ret = EMSGSIZE;
478                 goto fail;
479         }
480
481         needed = msglen + msghdrlen;
482         if (needed < msglen) {
483                 ret = EMSGSIZE;
484                 goto fail;
485         }
486
487         msg = malloc(needed);
488         if (msg == NULL) {
489                 ret = ENOMEM;
490                 goto fail;
491         }
492         hdr = unix_dgram_msghdr(msg);
493
494         msg->sock = q->sock;
495         msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
496                     fds_copy, num_fds);
497
498         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
499         return 0;
500 fail:
501         close_fd_array(fds_copy, num_fds);
502         return ret;
503 }
504
505 static void unix_dgram_send_job(void *private_data)
506 {
507         struct unix_dgram_msg *dmsg = private_data;
508
509         do {
510                 struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
511                 struct msghdr *msg = msghdr_buf_msghdr(hdr);
512                 dmsg->sent = sendmsg(dmsg->sock, msg, 0);
513         } while ((dmsg->sent == -1) && (errno == EINTR));
514
515         if (dmsg->sent == -1) {
516                 dmsg->sys_errno = errno;
517         }
518 }
519
520 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
521                                     void *private_data)
522 {
523         struct unix_dgram_ctx *ctx = private_data;
524         struct unix_dgram_send_queue *q;
525         struct unix_dgram_msg *msg;
526         int ret, job;
527
528         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
529         if (ret != 1) {
530                 return;
531         }
532
533         for (q = ctx->send_queues; q != NULL; q = q->next) {
534                 if (job == q->sock) {
535                         break;
536                 }
537         }
538
539         if (q == NULL) {
540                 /* Huh? Should not happen */
541                 return;
542         }
543
544         msg = q->msgs;
545         DLIST_REMOVE(q->msgs, msg);
546         close_fd_array_dgram_msg(msg);
547         free(msg);
548
549         if (q->msgs != NULL) {
550                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
551                                           unix_dgram_send_job, q->msgs);
552                 if (ret == 0) {
553                         return;
554                 }
555         }
556
557         unix_dgram_send_queue_free(q);
558 }
559
560 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
561                            const struct sockaddr_un *dst,
562                            const struct iovec *iov, int iovlen,
563                            const int *fds, size_t num_fds)
564 {
565         struct unix_dgram_send_queue *q;
566         struct msghdr msg;
567         ssize_t fdlen;
568         int ret;
569         int i;
570
571         if (num_fds > INT8_MAX) {
572                 return EINVAL;
573         }
574
575 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
576         if (num_fds > 0) {
577                 return ENOSYS;
578         }
579 #endif
580
581         for (i = 0; i < num_fds; i++) {
582                 /*
583                  * Make sure we only allow fd passing
584                  * for communication channels,
585                  * e.g. sockets, pipes, fifos, ...
586                  */
587                 ret = lseek(fds[i], 0, SEEK_CUR);
588                 if (ret == -1 && errno == ESPIPE) {
589                         /* ok */
590                         continue;
591                 }
592
593                 /*
594                  * Reject the message as we may need to call dup(),
595                  * if we queue the message.
596                  *
597                  * That might result in unexpected behavior for the caller
598                  * for files and broken posix locking.
599                  */
600                 return EINVAL;
601         }
602
603         /*
604          * To preserve message ordering, we have to queue a message when
605          * others are waiting in line already.
606          */
607         q = find_send_queue(ctx, dst->sun_path);
608         if (q != NULL) {
609                 return queue_msg(q, iov, iovlen, fds, num_fds);
610         }
611
612         /*
613          * Try a cheap nonblocking send
614          */
615
616         msg = (struct msghdr) {
617                 .msg_name = discard_const_p(struct sockaddr_un, dst),
618                 .msg_namelen = sizeof(*dst),
619                 .msg_iov = discard_const_p(struct iovec, iov),
620                 .msg_iovlen = iovlen
621         };
622
623         fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
624         if (fdlen == -1) {
625                 return EINVAL;
626         }
627
628         {
629                 uint8_t buf[fdlen];
630                 msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
631
632                 ret = sendmsg(ctx->sock, &msg, 0);
633         }
634
635         if (ret >= 0) {
636                 return 0;
637         }
638         if ((errno != EWOULDBLOCK) &&
639             (errno != EAGAIN) &&
640 #ifdef ENOBUFS
641             /* FreeBSD can give this for large messages */
642             (errno != ENOBUFS) &&
643 #endif
644             (errno != EINTR)) {
645                 return errno;
646         }
647
648         ret = unix_dgram_send_queue_init(ctx, dst, &q);
649         if (ret != 0) {
650                 return ret;
651         }
652         ret = queue_msg(q, iov, iovlen, fds, num_fds);
653         if (ret != 0) {
654                 unix_dgram_send_queue_free(q);
655                 return ret;
656         }
657         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
658                                   unix_dgram_send_job, q->msgs);
659         if (ret != 0) {
660                 unix_dgram_send_queue_free(q);
661                 return ret;
662         }
663         return 0;
664 }
665
666 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
667 {
668         return ctx->sock;
669 }
670
671 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
672 {
673         if (ctx->send_queues != NULL) {
674                 return EBUSY;
675         }
676
677         if (ctx->send_pool != NULL) {
678                 int ret = pthreadpool_destroy(ctx->send_pool);
679                 if (ret != 0) {
680                         return ret;
681                 }
682                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
683         }
684
685         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
686
687         close(ctx->sock);
688         if (getpid() == ctx->created_pid) {
689                 /* If we created it, unlink. Otherwise someone else might
690                  * still have it open */
691                 unlink(ctx->path);
692         }
693
694         free(ctx->recv_buf);
695         free(ctx);
696         return 0;
697 }
698
699 /*
700  * Every message starts with a uint64_t cookie.
701  *
702  * A value of 0 indicates a single-fragment message which is complete in
703  * itself. The data immediately follows the cookie.
704  *
705  * Every multi-fragment message has a cookie != 0 and starts with a cookie
706  * followed by a struct unix_msg_header and then the data. The pid and sock
707  * fields are used to assure uniqueness on the receiver side.
708  */
709
710 struct unix_msg_hdr {
711         size_t msglen;
712         pid_t pid;
713         int sock;
714 };
715
716 struct unix_msg {
717         struct unix_msg *prev, *next;
718         size_t msglen;
719         size_t received;
720         pid_t sender_pid;
721         int sender_sock;
722         uint64_t cookie;
723         uint8_t buf[1];
724 };
725
726 struct unix_msg_ctx {
727         struct unix_dgram_ctx *dgram;
728         size_t fragment_len;
729         uint64_t cookie;
730
731         void (*recv_callback)(struct unix_msg_ctx *ctx,
732                               uint8_t *msg, size_t msg_len,
733                               int *fds, size_t num_fds,
734                               void *private_data);
735         void *private_data;
736
737         struct unix_msg *msgs;
738 };
739
740 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
741                           uint8_t *buf, size_t buflen,
742                           int *fds, size_t num_fds,
743                           void *private_data);
744
745 int unix_msg_init(const struct sockaddr_un *addr,
746                   const struct poll_funcs *ev_funcs,
747                   size_t fragment_len,
748                   void (*recv_callback)(struct unix_msg_ctx *ctx,
749                                         uint8_t *msg, size_t msg_len,
750                                         int *fds, size_t num_fds,
751                                         void *private_data),
752                   void *private_data,
753                   struct unix_msg_ctx **result)
754 {
755         struct unix_msg_ctx *ctx;
756         int ret;
757
758         ctx = malloc(sizeof(*ctx));
759         if (ctx == NULL) {
760                 return ENOMEM;
761         }
762
763         *ctx = (struct unix_msg_ctx) {
764                 .fragment_len = fragment_len,
765                 .cookie = 1,
766                 .recv_callback = recv_callback,
767                 .private_data = private_data
768         };
769
770         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
771                               unix_msg_recv, ctx, &ctx->dgram);
772         if (ret != 0) {
773                 free(ctx);
774                 return ret;
775         }
776
777         *result = ctx;
778         return 0;
779 }
780
781 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
782                   const struct iovec *iov, int iovlen,
783                   const int *fds, size_t num_fds)
784 {
785         ssize_t msglen;
786         size_t sent;
787         int ret = 0;
788         struct iovec iov_copy[iovlen+2];
789         struct unix_msg_hdr hdr;
790         struct iovec src_iov;
791
792         if (iovlen < 0) {
793                 return EINVAL;
794         }
795
796         msglen = iov_buflen(iov, iovlen);
797         if (msglen == -1) {
798                 return EINVAL;
799         }
800
801         if (num_fds > INT8_MAX) {
802                 return EINVAL;
803         }
804
805         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
806                 uint64_t cookie = 0;
807
808                 iov_copy[0].iov_base = &cookie;
809                 iov_copy[0].iov_len = sizeof(cookie);
810                 if (iovlen > 0) {
811                         memcpy(&iov_copy[1], iov,
812                                sizeof(struct iovec) * iovlen);
813                 }
814
815                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
816                                        fds, num_fds);
817         }
818
819         hdr = (struct unix_msg_hdr) {
820                 .msglen = msglen,
821                 .pid = getpid(),
822                 .sock = unix_dgram_sock(ctx->dgram)
823         };
824
825         iov_copy[0].iov_base = &ctx->cookie;
826         iov_copy[0].iov_len = sizeof(ctx->cookie);
827         iov_copy[1].iov_base = &hdr;
828         iov_copy[1].iov_len = sizeof(hdr);
829
830         sent = 0;
831         src_iov = iov[0];
832
833         /*
834          * The following write loop sends the user message in pieces. We have
835          * filled the first two iovecs above with "cookie" and "hdr". In the
836          * following loops we pull message chunks from the user iov array and
837          * fill iov_copy piece by piece, possibly truncating chunks from the
838          * caller's iov array. Ugly, but hopefully efficient.
839          */
840
841         while (sent < msglen) {
842                 size_t fragment_len;
843                 size_t iov_index = 2;
844
845                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
846
847                 while (fragment_len < ctx->fragment_len) {
848                         size_t space, chunk;
849
850                         space = ctx->fragment_len - fragment_len;
851                         chunk = MIN(space, src_iov.iov_len);
852
853                         iov_copy[iov_index].iov_base = src_iov.iov_base;
854                         iov_copy[iov_index].iov_len = chunk;
855                         iov_index += 1;
856
857                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
858                         src_iov.iov_len -= chunk;
859                         fragment_len += chunk;
860
861                         if (src_iov.iov_len == 0) {
862                                 iov += 1;
863                                 iovlen -= 1;
864                                 if (iovlen == 0) {
865                                         break;
866                                 }
867                                 src_iov = iov[0];
868                         }
869                 }
870                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
871
872                 /*
873                  * only the last fragment should pass the fd array.
874                  * That simplifies the receiver a lot.
875                  */
876                 if (sent < msglen) {
877                         ret = unix_dgram_send(ctx->dgram, dst,
878                                               iov_copy, iov_index,
879                                               NULL, 0);
880                 } else {
881                         ret = unix_dgram_send(ctx->dgram, dst,
882                                               iov_copy, iov_index,
883                                               fds, num_fds);
884                 }
885                 if (ret != 0) {
886                         break;
887                 }
888         }
889
890         ctx->cookie += 1;
891         if (ctx->cookie == 0) {
892                 ctx->cookie += 1;
893         }
894
895         return ret;
896 }
897
898 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
899                           uint8_t *buf, size_t buflen,
900                           int *fds, size_t num_fds,
901                           void *private_data)
902 {
903         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
904         struct unix_msg_hdr hdr;
905         struct unix_msg *msg;
906         size_t space;
907         uint64_t cookie;
908
909         if (buflen < sizeof(cookie)) {
910                 goto close_fds;
911         }
912
913         memcpy(&cookie, buf, sizeof(cookie));
914
915         buf += sizeof(cookie);
916         buflen -= sizeof(cookie);
917
918         if (cookie == 0) {
919                 ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
920                                    ctx->private_data);
921                 return;
922         }
923
924         if (buflen < sizeof(hdr)) {
925                 goto close_fds;
926         }
927         memcpy(&hdr, buf, sizeof(hdr));
928
929         buf += sizeof(hdr);
930         buflen -= sizeof(hdr);
931
932         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
933                 if ((msg->sender_pid == hdr.pid) &&
934                     (msg->sender_sock == hdr.sock)) {
935                         break;
936                 }
937         }
938
939         if ((msg != NULL) && (msg->cookie != cookie)) {
940                 DLIST_REMOVE(ctx->msgs, msg);
941                 free(msg);
942                 msg = NULL;
943         }
944
945         if (msg == NULL) {
946                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
947                 if (msg == NULL) {
948                         goto close_fds;
949                 }
950                 *msg = (struct unix_msg) {
951                         .msglen = hdr.msglen,
952                         .sender_pid = hdr.pid,
953                         .sender_sock = hdr.sock,
954                         .cookie = cookie
955                 };
956                 DLIST_ADD(ctx->msgs, msg);
957         }
958
959         space = msg->msglen - msg->received;
960         if (buflen > space) {
961                 goto close_fds;
962         }
963
964         memcpy(msg->buf + msg->received, buf, buflen);
965         msg->received += buflen;
966
967         if (msg->received < msg->msglen) {
968                 goto close_fds;
969         }
970
971         DLIST_REMOVE(ctx->msgs, msg);
972         ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
973                            ctx->private_data);
974         free(msg);
975         return;
976
977 close_fds:
978         close_fd_array(fds, num_fds);
979 }
980
981 int unix_msg_free(struct unix_msg_ctx *ctx)
982 {
983         int ret;
984
985         ret = unix_dgram_free(ctx->dgram);
986         if (ret != 0) {
987                 return ret;
988         }
989
990         while (ctx->msgs != NULL) {
991                 struct unix_msg *msg = ctx->msgs;
992                 DLIST_REMOVE(ctx->msgs, msg);
993                 free(msg);
994         }
995
996         free(ctx);
997         return 0;
998 }