]> git.samba.org - obnox/samba/samba-obnox.git/blob - source3/lib/unix_msg/unix_msg.c
s3:unix_msg: factor extract_fd_array_from_msghdr() out of unix_dgram_recv_handler()
[obnox/samba/samba-obnox.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include <fcntl.h>
27
28 /*
29  * This file implements two abstractions: The "unix_dgram" functions implement
30  * queueing for unix domain datagram sockets. You can send to a destination
31  * socket, and if that has no free space available, it will fall back to an
32  * anonymous socket that will poll for writability. "unix_dgram" expects the
33  * data size not to exceed the system limit.
34  *
35  * The "unix_msg" functions implement the fragmentation of large messages on
36  * top of "unix_dgram". This is what is exposed to the user of this API.
37  */
38
39 struct unix_dgram_msg {
40         struct unix_dgram_msg *prev, *next;
41
42         int sock;
43         ssize_t sent;
44         int sys_errno;
45         size_t num_fds;
46         int *fds;
47         struct iovec iov;
48 };
49
50 struct unix_dgram_send_queue {
51         struct unix_dgram_send_queue *prev, *next;
52         struct unix_dgram_ctx *ctx;
53         int sock;
54         struct unix_dgram_msg *msgs;
55         char path[];
56 };
57
58 struct unix_dgram_ctx {
59         int sock;
60         pid_t created_pid;
61         const struct poll_funcs *ev_funcs;
62         size_t max_msg;
63
64         void (*recv_callback)(struct unix_dgram_ctx *ctx,
65                               uint8_t *msg, size_t msg_len,
66                               int *fds, size_t num_fds,
67                               void *private_data);
68         void *private_data;
69
70         struct poll_watch *sock_read_watch;
71         struct unix_dgram_send_queue *send_queues;
72
73         struct pthreadpool *send_pool;
74         struct poll_watch *pool_read_watch;
75
76         uint8_t *recv_buf;
77         char path[];
78 };
79
80 static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
81 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
82                                     void *private_data);
83
84 /* Set socket non blocking. */
85 static int prepare_socket_nonblock(int sock)
86 {
87         int flags;
88 #ifdef O_NONBLOCK
89 #define FLAG_TO_SET O_NONBLOCK
90 #else
91 #ifdef SYSV
92 #define FLAG_TO_SET O_NDELAY
93 #else /* BSD */
94 #define FLAG_TO_SET FNDELAY
95 #endif
96 #endif
97
98         flags = fcntl(sock, F_GETFL);
99         if (flags == -1) {
100                 return errno;
101         }
102         flags |= FLAG_TO_SET;
103         if (fcntl(sock, F_SETFL, flags) == -1) {
104                 return errno;
105         }
106
107 #undef FLAG_TO_SET
108         return 0;
109 }
110
111 /* Set socket close on exec. */
112 static int prepare_socket_cloexec(int sock)
113 {
114 #ifdef FD_CLOEXEC
115         int flags;
116
117         flags = fcntl(sock, F_GETFD, 0);
118         if (flags == -1) {
119                 return errno;
120         }
121         flags |= FD_CLOEXEC;
122         if (fcntl(sock, F_SETFD, flags) == -1) {
123                 return errno;
124         }
125 #endif
126         return 0;
127 }
128
129 /* Set socket non blocking and close on exec. */
130 static int prepare_socket(int sock)
131 {
132         int ret = prepare_socket_nonblock(sock);
133
134         if (ret) {
135                 return ret;
136         }
137         return prepare_socket_cloexec(sock);
138 }
139
140 static void extract_fd_array_from_msghdr(struct msghdr *msg, int **fds,
141                                          size_t *num_fds)
142 {
143 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
144         struct cmsghdr *cmsg;
145
146         for(cmsg = CMSG_FIRSTHDR(msg);
147             cmsg != NULL;
148             cmsg = CMSG_NXTHDR(msg, cmsg))
149         {
150                 void *data = CMSG_DATA(cmsg);
151
152                 if (cmsg->cmsg_type != SCM_RIGHTS) {
153                         continue;
154                 }
155                 if (cmsg->cmsg_level != SOL_SOCKET) {
156                         continue;
157                 }
158
159                 *fds = (int *)data;
160                 *num_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof (int);
161                 break;
162         }
163 #endif
164 }
165
166 static void close_fd_array(int *fds, size_t num_fds)
167 {
168         size_t i;
169
170         for (i = 0; i < num_fds; i++) {
171                 if (fds[i] == -1) {
172                         continue;
173                 }
174
175                 close(fds[i]);
176                 fds[i] = -1;
177         }
178 }
179
180 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
181                            const struct poll_funcs *ev_funcs,
182                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
183                                                  uint8_t *msg, size_t msg_len,
184                                                  int *fds, size_t num_fds,
185                                                  void *private_data),
186                            void *private_data,
187                            struct unix_dgram_ctx **result)
188 {
189         struct unix_dgram_ctx *ctx;
190         size_t pathlen;
191         int ret;
192
193         if (addr != NULL) {
194                 pathlen = strlen(addr->sun_path)+1;
195         } else {
196                 pathlen = 1;
197         }
198
199         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
200         if (ctx == NULL) {
201                 return ENOMEM;
202         }
203         if (addr != NULL) {
204                 memcpy(ctx->path, addr->sun_path, pathlen);
205         } else {
206                 ctx->path[0] = '\0';
207         }
208
209         *ctx = (struct unix_dgram_ctx) {
210                 .max_msg = max_msg,
211                 .ev_funcs = ev_funcs,
212                 .recv_callback = recv_callback,
213                 .private_data = private_data,
214                 .created_pid = (pid_t)-1
215         };
216
217         ctx->recv_buf = malloc(max_msg);
218         if (ctx->recv_buf == NULL) {
219                 free(ctx);
220                 return ENOMEM;
221         }
222
223         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
224         if (ctx->sock == -1) {
225                 ret = errno;
226                 goto fail_free;
227         }
228
229         /* Set non-blocking and close-on-exec. */
230         ret = prepare_socket(ctx->sock);
231         if (ret != 0) {
232                 goto fail_close;
233         }
234
235         if (addr != NULL) {
236                 ret = bind(ctx->sock,
237                            (const struct sockaddr *)(const void *)addr,
238                            sizeof(*addr));
239                 if (ret == -1) {
240                         ret = errno;
241                         goto fail_close;
242                 }
243
244                 ctx->created_pid = getpid();
245
246                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
247                         ctx->ev_funcs, ctx->sock, POLLIN,
248                         unix_dgram_recv_handler, ctx);
249
250                 if (ctx->sock_read_watch == NULL) {
251                         ret = ENOMEM;
252                         goto fail_close;
253                 }
254         }
255
256         *result = ctx;
257         return 0;
258
259 fail_close:
260         close(ctx->sock);
261 fail_free:
262         free(ctx->recv_buf);
263         free(ctx);
264         return ret;
265 }
266
267 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
268                                     void *private_data)
269 {
270         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
271         ssize_t received;
272         int flags = 0;
273         struct msghdr msg;
274         struct iovec iov;
275 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
276         char buf[CMSG_SPACE(sizeof(int)*INT8_MAX)] = { 0, };
277 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
278         int *fds = NULL;
279         size_t i, num_fds = 0;
280
281         iov = (struct iovec) {
282                 .iov_base = (void *)ctx->recv_buf,
283                 .iov_len = ctx->max_msg,
284         };
285
286         msg = (struct msghdr) {
287                 .msg_iov = &iov,
288                 .msg_iovlen = 1,
289 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
290                 .msg_control = buf,
291                 .msg_controllen = sizeof(buf),
292 #endif
293         };
294
295 #ifdef MSG_CMSG_CLOEXEC
296         flags |= MSG_CMSG_CLOEXEC;
297 #endif
298
299         received = recvmsg(fd, &msg, flags);
300         if (received == -1) {
301                 if ((errno == EAGAIN) ||
302                     (errno == EWOULDBLOCK) ||
303                     (errno == EINTR) || (errno == ENOMEM)) {
304                         /* Not really an error - just try again. */
305                         return;
306                 }
307                 /* Problem with the socket. Set it unreadable. */
308                 ctx->ev_funcs->watch_update(w, 0);
309                 return;
310         }
311         if (received > ctx->max_msg) {
312                 /* More than we expected, not for us */
313                 return;
314         }
315
316         extract_fd_array_from_msghdr(&msg, &fds, &num_fds);
317
318         for (i = 0; i < num_fds; i++) {
319                 int err;
320
321                 err = prepare_socket_cloexec(fds[i]);
322                 if (err != 0) {
323                         goto cleanup_fds;
324                 }
325         }
326
327         ctx->recv_callback(ctx, ctx->recv_buf, received,
328                            fds, num_fds, ctx->private_data);
329         return;
330
331 cleanup_fds:
332         close_fd_array(fds, num_fds);
333
334         ctx->recv_callback(ctx, ctx->recv_buf, received,
335                            NULL, 0, ctx->private_data);
336 }
337
338 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
339                                     void *private_data);
340
341 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
342 {
343         int ret, signalfd;
344
345         if (ctx->send_pool != NULL) {
346                 return 0;
347         }
348
349         ret = pthreadpool_init(0, &ctx->send_pool);
350         if (ret != 0) {
351                 return ret;
352         }
353
354         signalfd = pthreadpool_signal_fd(ctx->send_pool);
355
356         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
357                 ctx->ev_funcs, signalfd, POLLIN,
358                 unix_dgram_job_finished, ctx);
359         if (ctx->pool_read_watch == NULL) {
360                 pthreadpool_destroy(ctx->send_pool);
361                 ctx->send_pool = NULL;
362                 return ENOMEM;
363         }
364
365         return 0;
366 }
367
368 static int unix_dgram_send_queue_init(
369         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
370         struct unix_dgram_send_queue **result)
371 {
372         struct unix_dgram_send_queue *q;
373         size_t pathlen;
374         int ret, err;
375
376         pathlen = strlen(dst->sun_path)+1;
377
378         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
379         if (q == NULL) {
380                 return ENOMEM;
381         }
382         q->ctx = ctx;
383         q->msgs = NULL;
384         memcpy(q->path, dst->sun_path, pathlen);
385
386         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
387         if (q->sock == -1) {
388                 err = errno;
389                 goto fail_free;
390         }
391
392         err = prepare_socket_cloexec(q->sock);
393         if (err != 0) {
394                 goto fail_close;
395         }
396
397         do {
398                 ret = connect(q->sock,
399                               (const struct sockaddr *)(const void *)dst,
400                               sizeof(*dst));
401         } while ((ret == -1) && (errno == EINTR));
402
403         if (ret == -1) {
404                 err = errno;
405                 goto fail_close;
406         }
407
408         err = unix_dgram_init_pthreadpool(ctx);
409         if (err != 0) {
410                 goto fail_close;
411         }
412
413         DLIST_ADD(ctx->send_queues, q);
414
415         *result = q;
416         return 0;
417
418 fail_close:
419         close(q->sock);
420 fail_free:
421         free(q);
422         return err;
423 }
424
425 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
426 {
427         struct unix_dgram_ctx *ctx = q->ctx;
428
429         while (q->msgs != NULL) {
430                 struct unix_dgram_msg *msg;
431                 msg = q->msgs;
432                 DLIST_REMOVE(q->msgs, msg);
433                 close_fd_array(msg->fds, msg->num_fds);
434                 free(msg);
435         }
436         close(q->sock);
437         DLIST_REMOVE(ctx->send_queues, q);
438         free(q);
439 }
440
441 static struct unix_dgram_send_queue *find_send_queue(
442         struct unix_dgram_ctx *ctx, const char *dst_sock)
443 {
444         struct unix_dgram_send_queue *s;
445
446         for (s = ctx->send_queues; s != NULL; s = s->next) {
447                 if (strcmp(s->path, dst_sock) == 0) {
448                         return s;
449                 }
450         }
451         return NULL;
452 }
453
454 static int queue_msg(struct unix_dgram_send_queue *q,
455                      const struct iovec *iov, int iovlen,
456                      const int *fds, size_t num_fds)
457 {
458         struct unix_dgram_msg *msg;
459         ssize_t data_len;
460         uint8_t *data_buf;
461         size_t msglen;
462         size_t fds_size = sizeof(int) * num_fds;
463         int fds_copy[MIN(num_fds, INT8_MAX)];
464         size_t fds_padding = 0;
465         int i;
466         size_t tmp;
467         int ret = -1;
468
469         if (num_fds > INT8_MAX) {
470                 return EINVAL;
471         }
472
473         msglen = sizeof(struct unix_dgram_msg);
474
475         data_len = iov_buflen(iov, iovlen);
476         if (data_len == -1) {
477                 return EINVAL;
478         }
479
480         tmp = msglen + data_len;
481         if ((tmp < msglen) || (tmp < data_len)) {
482                 /* overflow */
483                 return EINVAL;
484         }
485         msglen = tmp;
486
487         if (num_fds > 0) {
488                 const size_t fds_align = sizeof(int) - 1;
489
490                 tmp = msglen + fds_align;
491                 if ((tmp < msglen) || (tmp < fds_align)) {
492                         /* overflow */
493                         return EINVAL;
494                 }
495                 tmp &= ~fds_align;
496
497                 fds_padding = tmp - msglen;
498                 msglen = tmp;
499
500                 tmp = msglen + fds_size;
501                 if ((tmp < msglen) || (tmp < fds_size)) {
502                         /* overflow */
503                         return EINVAL;
504                 }
505                 msglen = tmp;
506         }
507
508         for (i = 0; i < num_fds; i++) {
509                 fds_copy[i] = -1;
510         }
511
512         for (i = 0; i < num_fds; i++) {
513                 fds_copy[i] = dup(fds[i]);
514                 if (fds_copy[i] == -1) {
515                         ret = errno;
516                         goto fail;
517                 }
518         }
519
520         msg = malloc(msglen);
521         if (msg == NULL) {
522                 ret = ENOMEM;
523                 goto fail;
524         }
525
526         msg->sock = q->sock;
527
528         data_buf = (uint8_t *)(msg + 1);
529
530         msg->iov = (struct iovec) {
531                 .iov_base = (void *)data_buf,
532                 .iov_len = data_len,
533         };
534
535         for (i=0; i<iovlen; i++) {
536                 memcpy(data_buf, iov[i].iov_base, iov[i].iov_len);
537                 data_buf += iov[i].iov_len;
538         }
539
540         msg->num_fds = num_fds;
541         if (msg->num_fds > 0) {
542                 void *fds_ptr;
543                 data_buf += fds_padding;
544                 fds_ptr= (void *)data_buf;
545                 memcpy(fds_ptr, fds_copy, fds_size);
546                 msg->fds = (int *)fds_ptr;
547         } else {
548                 msg->fds = NULL;
549         }
550
551         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
552         return 0;
553
554 fail:
555         close_fd_array(fds_copy, num_fds);
556         return ret;
557 }
558
559 static void unix_dgram_send_job(void *private_data)
560 {
561         struct unix_dgram_msg *dmsg = private_data;
562         struct msghdr msg = {
563                 .msg_iov = &dmsg->iov,
564                 .msg_iovlen = 1,
565         };
566 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
567         struct cmsghdr *cmsg;
568         size_t fds_size = sizeof(int) * dmsg->num_fds;
569         size_t cmsg_len = CMSG_LEN(fds_size);
570         size_t cmsg_space = CMSG_SPACE(fds_size);
571         char cmsg_buf[cmsg_space];
572
573         if (dmsg->num_fds > 0) {
574                 void *fdptr;
575
576                 memset(cmsg_buf, 0, cmsg_space);
577
578                 msg.msg_control = cmsg_buf;
579                 msg.msg_controllen = cmsg_space;
580                 cmsg = CMSG_FIRSTHDR(&msg);
581                 cmsg->cmsg_level = SOL_SOCKET;
582                 cmsg->cmsg_type = SCM_RIGHTS;
583                 cmsg->cmsg_len = cmsg_len;
584                 fdptr = CMSG_DATA(cmsg);
585                 memcpy(fdptr, dmsg->fds, fds_size);
586                 msg.msg_controllen = cmsg->cmsg_len;
587         }
588 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
589
590         do {
591                 dmsg->sent = sendmsg(dmsg->sock, &msg, 0);
592         } while ((dmsg->sent == -1) && (errno == EINTR));
593
594         if (dmsg->sent == -1) {
595                 dmsg->sys_errno = errno;
596         }
597 }
598
599 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
600                                     void *private_data)
601 {
602         struct unix_dgram_ctx *ctx = private_data;
603         struct unix_dgram_send_queue *q;
604         struct unix_dgram_msg *msg;
605         int ret, job;
606
607         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
608         if (ret != 1) {
609                 return;
610         }
611
612         for (q = ctx->send_queues; q != NULL; q = q->next) {
613                 if (job == q->sock) {
614                         break;
615                 }
616         }
617
618         if (q == NULL) {
619                 /* Huh? Should not happen */
620                 return;
621         }
622
623         msg = q->msgs;
624         DLIST_REMOVE(q->msgs, msg);
625         close_fd_array(msg->fds, msg->num_fds);
626         free(msg);
627
628         if (q->msgs != NULL) {
629                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
630                                           unix_dgram_send_job, q->msgs);
631                 if (ret == 0) {
632                         return;
633                 }
634         }
635
636         unix_dgram_send_queue_free(q);
637 }
638
639 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
640                            const struct sockaddr_un *dst,
641                            const struct iovec *iov, int iovlen,
642                            const int *fds, size_t num_fds)
643 {
644         struct unix_dgram_send_queue *q;
645         struct msghdr msg;
646 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
647         struct cmsghdr *cmsg;
648         size_t fds_size = sizeof(int) * num_fds;
649         size_t cmsg_len = CMSG_LEN(fds_size);
650         size_t cmsg_space = CMSG_SPACE(fds_size);
651         char cmsg_buf[cmsg_space];
652 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
653         int ret;
654         int i;
655
656         if (num_fds > INT8_MAX) {
657                 return EINVAL;
658         }
659
660 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
661         if (num_fds > 0) {
662                 return ENOSYS;
663         }
664 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
665
666         for (i = 0; i < num_fds; i++) {
667                 /*
668                  * Make sure we only allow fd passing
669                  * for communication channels,
670                  * e.g. sockets, pipes, fifos, ...
671                  */
672                 ret = lseek(fds[i], 0, SEEK_CUR);
673                 if (ret == -1 && errno == ESPIPE) {
674                         /* ok */
675                         continue;
676                 }
677
678                 /*
679                  * Reject the message as we may need to call dup(),
680                  * if we queue the message.
681                  *
682                  * That might result in unexpected behavior for the caller
683                  * for files and broken posix locking.
684                  */
685                 return EINVAL;
686         }
687
688         /*
689          * To preserve message ordering, we have to queue a message when
690          * others are waiting in line already.
691          */
692         q = find_send_queue(ctx, dst->sun_path);
693         if (q != NULL) {
694                 return queue_msg(q, iov, iovlen, fds, num_fds);
695         }
696
697         /*
698          * Try a cheap nonblocking send
699          */
700
701         msg = (struct msghdr) {
702                 .msg_name = discard_const_p(struct sockaddr_un, dst),
703                 .msg_namelen = sizeof(*dst),
704                 .msg_iov = discard_const_p(struct iovec, iov),
705                 .msg_iovlen = iovlen
706         };
707 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
708         if (num_fds > 0) {
709                 void *fdptr;
710
711                 memset(cmsg_buf, 0, cmsg_space);
712
713                 msg.msg_control = cmsg_buf;
714                 msg.msg_controllen = cmsg_space;
715                 cmsg = CMSG_FIRSTHDR(&msg);
716                 cmsg->cmsg_level = SOL_SOCKET;
717                 cmsg->cmsg_type = SCM_RIGHTS;
718                 cmsg->cmsg_len = cmsg_len;
719                 fdptr = CMSG_DATA(cmsg);
720                 memcpy(fdptr, fds, fds_size);
721                 msg.msg_controllen = cmsg->cmsg_len;
722         }
723 #endif /*  HAVE_STRUCT_MSGHDR_MSG_CONTROL */
724
725         ret = sendmsg(ctx->sock, &msg, 0);
726         if (ret >= 0) {
727                 return 0;
728         }
729         if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
730                 return errno;
731         }
732
733         ret = unix_dgram_send_queue_init(ctx, dst, &q);
734         if (ret != 0) {
735                 return ret;
736         }
737         ret = queue_msg(q, iov, iovlen, fds, num_fds);
738         if (ret != 0) {
739                 unix_dgram_send_queue_free(q);
740                 return ret;
741         }
742         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
743                                   unix_dgram_send_job, q->msgs);
744         if (ret != 0) {
745                 unix_dgram_send_queue_free(q);
746                 return ret;
747         }
748         return 0;
749 }
750
751 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
752 {
753         return ctx->sock;
754 }
755
756 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
757 {
758         if (ctx->send_queues != NULL) {
759                 return EBUSY;
760         }
761
762         if (ctx->send_pool != NULL) {
763                 int ret = pthreadpool_destroy(ctx->send_pool);
764                 if (ret != 0) {
765                         return ret;
766                 }
767                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
768         }
769
770         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
771
772         if (getpid() == ctx->created_pid) {
773                 /* If we created it, unlink. Otherwise someone else might
774                  * still have it open */
775                 unlink(ctx->path);
776         }
777
778         close(ctx->sock);
779         free(ctx->recv_buf);
780         free(ctx);
781         return 0;
782 }
783
784 /*
785  * Every message starts with a uint64_t cookie.
786  *
787  * A value of 0 indicates a single-fragment message which is complete in
788  * itself. The data immediately follows the cookie.
789  *
790  * Every multi-fragment message has a cookie != 0 and starts with a cookie
791  * followed by a struct unix_msg_header and then the data. The pid and sock
792  * fields are used to assure uniqueness on the receiver side.
793  */
794
795 struct unix_msg_hdr {
796         size_t msglen;
797         pid_t pid;
798         int sock;
799 };
800
801 struct unix_msg {
802         struct unix_msg *prev, *next;
803         size_t msglen;
804         size_t received;
805         pid_t sender_pid;
806         int sender_sock;
807         uint64_t cookie;
808         uint8_t buf[1];
809 };
810
811 struct unix_msg_ctx {
812         struct unix_dgram_ctx *dgram;
813         size_t fragment_len;
814         uint64_t cookie;
815
816         void (*recv_callback)(struct unix_msg_ctx *ctx,
817                               uint8_t *msg, size_t msg_len,
818                               int *fds, size_t num_fds,
819                               void *private_data);
820         void *private_data;
821
822         struct unix_msg *msgs;
823 };
824
825 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
826                           uint8_t *buf, size_t buflen,
827                           int *fds, size_t num_fds,
828                           void *private_data);
829
830 int unix_msg_init(const struct sockaddr_un *addr,
831                   const struct poll_funcs *ev_funcs,
832                   size_t fragment_len, uint64_t cookie,
833                   void (*recv_callback)(struct unix_msg_ctx *ctx,
834                                         uint8_t *msg, size_t msg_len,
835                                         int *fds, size_t num_fds,
836                                         void *private_data),
837                   void *private_data,
838                   struct unix_msg_ctx **result)
839 {
840         struct unix_msg_ctx *ctx;
841         int ret;
842
843         ctx = malloc(sizeof(*ctx));
844         if (ctx == NULL) {
845                 return ENOMEM;
846         }
847
848         *ctx = (struct unix_msg_ctx) {
849                 .fragment_len = fragment_len,
850                 .cookie = cookie,
851                 .recv_callback = recv_callback,
852                 .private_data = private_data
853         };
854
855         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
856                               unix_msg_recv, ctx, &ctx->dgram);
857         if (ret != 0) {
858                 free(ctx);
859                 return ret;
860         }
861
862         *result = ctx;
863         return 0;
864 }
865
866 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
867                   const struct iovec *iov, int iovlen,
868                   const int *fds, size_t num_fds)
869 {
870         ssize_t msglen;
871         size_t sent;
872         int ret = 0;
873         struct iovec iov_copy[iovlen+2];
874         struct unix_msg_hdr hdr;
875         struct iovec src_iov;
876
877         if (iovlen < 0) {
878                 return EINVAL;
879         }
880
881         msglen = iov_buflen(iov, iovlen);
882         if (msglen == -1) {
883                 return EINVAL;
884         }
885
886 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
887         if (num_fds > 0) {
888                 return ENOSYS;
889         }
890 #endif /* ! HAVE_STRUCT_MSGHDR_MSG_CONTROL */
891
892         if (num_fds > INT8_MAX) {
893                 return EINVAL;
894         }
895
896         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
897                 uint64_t cookie = 0;
898
899                 iov_copy[0].iov_base = &cookie;
900                 iov_copy[0].iov_len = sizeof(cookie);
901                 if (iovlen > 0) {
902                         memcpy(&iov_copy[1], iov,
903                                sizeof(struct iovec) * iovlen);
904                 }
905
906                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
907                                        fds, num_fds);
908         }
909
910         hdr = (struct unix_msg_hdr) {
911                 .msglen = msglen,
912                 .pid = getpid(),
913                 .sock = unix_dgram_sock(ctx->dgram)
914         };
915
916         iov_copy[0].iov_base = &ctx->cookie;
917         iov_copy[0].iov_len = sizeof(ctx->cookie);
918         iov_copy[1].iov_base = &hdr;
919         iov_copy[1].iov_len = sizeof(hdr);
920
921         sent = 0;
922         src_iov = iov[0];
923
924         /*
925          * The following write loop sends the user message in pieces. We have
926          * filled the first two iovecs above with "cookie" and "hdr". In the
927          * following loops we pull message chunks from the user iov array and
928          * fill iov_copy piece by piece, possibly truncating chunks from the
929          * caller's iov array. Ugly, but hopefully efficient.
930          */
931
932         while (sent < msglen) {
933                 size_t fragment_len;
934                 size_t iov_index = 2;
935
936                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
937
938                 while (fragment_len < ctx->fragment_len) {
939                         size_t space, chunk;
940
941                         space = ctx->fragment_len - fragment_len;
942                         chunk = MIN(space, src_iov.iov_len);
943
944                         iov_copy[iov_index].iov_base = src_iov.iov_base;
945                         iov_copy[iov_index].iov_len = chunk;
946                         iov_index += 1;
947
948                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
949                         src_iov.iov_len -= chunk;
950                         fragment_len += chunk;
951
952                         if (src_iov.iov_len == 0) {
953                                 iov += 1;
954                                 iovlen -= 1;
955                                 if (iovlen == 0) {
956                                         break;
957                                 }
958                                 src_iov = iov[0];
959                         }
960                 }
961                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
962
963                 /*
964                  * only the last fragment should pass the fd array.
965                  * That simplifies the receiver a lot.
966                  */
967                 if (sent < msglen) {
968                         ret = unix_dgram_send(ctx->dgram, dst,
969                                               iov_copy, iov_index,
970                                               NULL, 0);
971                 } else {
972                         ret = unix_dgram_send(ctx->dgram, dst,
973                                               iov_copy, iov_index,
974                                               fds, num_fds);
975                 }
976                 if (ret != 0) {
977                         break;
978                 }
979         }
980
981         ctx->cookie += 1;
982         if (ctx->cookie == 0) {
983                 ctx->cookie += 1;
984         }
985
986         return ret;
987 }
988
989 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
990                           uint8_t *buf, size_t buflen,
991                           int *fds, size_t num_fds,
992                           void *private_data)
993 {
994         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
995         struct unix_msg_hdr hdr;
996         struct unix_msg *msg;
997         size_t space;
998         uint64_t cookie;
999
1000         if (buflen < sizeof(cookie)) {
1001                 goto close_fds;
1002         }
1003
1004         memcpy(&cookie, buf, sizeof(cookie));
1005
1006         buf += sizeof(cookie);
1007         buflen -= sizeof(cookie);
1008
1009         if (cookie == 0) {
1010                 ctx->recv_callback(ctx, buf, buflen, fds, num_fds, ctx->private_data);
1011                 return;
1012         }
1013
1014         if (buflen < sizeof(hdr)) {
1015                 goto close_fds;
1016         }
1017         memcpy(&hdr, buf, sizeof(hdr));
1018
1019         buf += sizeof(hdr);
1020         buflen -= sizeof(hdr);
1021
1022         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
1023                 if ((msg->sender_pid == hdr.pid) &&
1024                     (msg->sender_sock == hdr.sock)) {
1025                         break;
1026                 }
1027         }
1028
1029         if ((msg != NULL) && (msg->cookie != cookie)) {
1030                 DLIST_REMOVE(ctx->msgs, msg);
1031                 free(msg);
1032                 msg = NULL;
1033         }
1034
1035         if (msg == NULL) {
1036                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
1037                 if (msg == NULL) {
1038                         goto close_fds;
1039                 }
1040                 *msg = (struct unix_msg) {
1041                         .msglen = hdr.msglen,
1042                         .sender_pid = hdr.pid,
1043                         .sender_sock = hdr.sock,
1044                         .cookie = cookie
1045                 };
1046                 DLIST_ADD(ctx->msgs, msg);
1047         }
1048
1049         space = msg->msglen - msg->received;
1050         if (buflen > space) {
1051                 goto close_fds;
1052         }
1053
1054         memcpy(msg->buf + msg->received, buf, buflen);
1055         msg->received += buflen;
1056
1057         if (msg->received < msg->msglen) {
1058                 goto close_fds;
1059         }
1060
1061         DLIST_REMOVE(ctx->msgs, msg);
1062         ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds, ctx->private_data);
1063         free(msg);
1064         return;
1065
1066 close_fds:
1067         close_fd_array(fds, num_fds);
1068 }
1069
1070 int unix_msg_free(struct unix_msg_ctx *ctx)
1071 {
1072         int ret;
1073
1074         ret = unix_dgram_free(ctx->dgram);
1075         if (ret != 0) {
1076                 return ret;
1077         }
1078
1079         while (ctx->msgs != NULL) {
1080                 struct unix_msg *msg = ctx->msgs;
1081                 DLIST_REMOVE(ctx->msgs, msg);
1082                 free(msg);
1083         }
1084
1085         free(ctx);
1086         return 0;
1087 }
1088
1089 static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
1090 {
1091         size_t buflen = 0;
1092         int i;
1093
1094         for (i=0; i<iovlen; i++) {
1095                 size_t thislen = iov[i].iov_len;
1096                 size_t tmp = buflen + thislen;
1097
1098                 if ((tmp < buflen) || (tmp < thislen)) {
1099                         /* overflow */
1100                         return -1;
1101                 }
1102                 buflen = tmp;
1103         }
1104         return buflen;
1105 }