unix_msg: Fix unix_dgram_send_queue_init
[metze/samba/wip.git] / source3 / lib / unix_msg / unix_msg.c
index b53a4c65a14ee51382c2d664e12f18018f31618e..5cbf428c894473c5ef1ade2ca00331fc2943abff 100644 (file)
 #include "system/select.h"
 #include "system/time.h"
 #include "system/network.h"
-#include "dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "lib/util/dlinklist.h"
+#include "pthreadpool/pthreadpool_pipe.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/msghdr.h"
 #include <fcntl.h>
+#include "lib/util/time.h"
 
 /*
  * This file implements two abstractions: The "unix_dgram" functions implement
@@ -42,8 +45,6 @@ struct unix_dgram_msg {
        int sock;
        ssize_t sent;
        int sys_errno;
-       size_t buflen;
-       uint8_t buf[1];
 };
 
 struct unix_dgram_send_queue {
@@ -51,7 +52,8 @@ struct unix_dgram_send_queue {
        struct unix_dgram_ctx *ctx;
        int sock;
        struct unix_dgram_msg *msgs;
-       char path[1];
+       struct poll_timeout *timeout;
+       char path[];
 };
 
 struct unix_dgram_ctx {
@@ -62,25 +64,25 @@ struct unix_dgram_ctx {
 
        void (*recv_callback)(struct unix_dgram_ctx *ctx,
                              uint8_t *msg, size_t msg_len,
+                             int *fds, size_t num_fds,
                              void *private_data);
        void *private_data;
 
        struct poll_watch *sock_read_watch;
        struct unix_dgram_send_queue *send_queues;
 
-       struct pthreadpool *send_pool;
+       struct pthreadpool_pipe *send_pool;
        struct poll_watch *pool_read_watch;
 
        uint8_t *recv_buf;
-       char path[1];
+       char path[];
 };
 
-static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
                                    void *private_data);
 
 /* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock)
+static int prepare_socket_nonblock(int sock, bool nonblock)
 {
        int flags;
 #ifdef O_NONBLOCK
@@ -97,7 +99,11 @@ static int prepare_socket_nonblock(int sock)
        if (flags == -1) {
                return errno;
        }
-       flags |= FLAG_TO_SET;
+       if (nonblock) {
+               flags |= FLAG_TO_SET;
+       } else {
+               flags &= ~FLAG_TO_SET;
+       }
        if (fcntl(sock, F_SETFL, flags) == -1) {
                return errno;
        }
@@ -127,7 +133,7 @@ static int prepare_socket_cloexec(int sock)
 /* Set socket non blocking and close on exec. */
 static int prepare_socket(int sock)
 {
-       int ret = prepare_socket_nonblock(sock);
+       int ret = prepare_socket_nonblock(sock, true);
 
        if (ret) {
                return ret;
@@ -135,10 +141,53 @@ static int prepare_socket(int sock)
        return prepare_socket_cloexec(sock);
 }
 
+static size_t unix_dgram_msg_size(void)
+{
+       size_t msgsize = sizeof(struct unix_dgram_msg);
+       msgsize = (msgsize + 15) & ~15; /* align to 16 */
+       return msgsize;
+}
+
+static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
+{
+       /*
+        * Not portable in C99, but "msg" is aligned and so is
+        * unix_dgram_msg_size()
+        */
+       return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+       size_t i;
+
+       for (i = 0; i < num_fds; i++) {
+               if (fds[i] == -1) {
+                       continue;
+               }
+
+               close(fds[i]);
+               fds[i] = -1;
+       }
+}
+
+static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
+{
+       struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
+       struct msghdr *msg = msghdr_buf_msghdr(hdr);
+       size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
+       int fds[num_fds];
+
+       msghdr_extract_fds(msg, fds, num_fds);
+
+       close_fd_array(fds, num_fds);
+}
+
 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
                           const struct poll_funcs *ev_funcs,
                           void (*recv_callback)(struct unix_dgram_ctx *ctx,
                                                 uint8_t *msg, size_t msg_len,
+                                                int *fds, size_t num_fds,
                                                 void *private_data),
                           void *private_data,
                           struct unix_dgram_ctx **result)
@@ -163,20 +212,19 @@ static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
                ctx->path[0] = '\0';
        }
 
+       *ctx = (struct unix_dgram_ctx) {
+               .max_msg = max_msg,
+               .ev_funcs = ev_funcs,
+               .recv_callback = recv_callback,
+               .private_data = private_data,
+               .created_pid = (pid_t)-1
+       };
+
        ctx->recv_buf = malloc(max_msg);
        if (ctx->recv_buf == NULL) {
                free(ctx);
                return ENOMEM;
        }
-       ctx->max_msg = max_msg;
-       ctx->ev_funcs = ev_funcs;
-       ctx->recv_callback = recv_callback;
-       ctx->private_data = private_data;
-       ctx->sock_read_watch = NULL;
-       ctx->send_pool = NULL;
-       ctx->pool_read_watch = NULL;
-       ctx->send_queues = NULL;
-       ctx->created_pid = (pid_t)-1;
 
        ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
        if (ctx->sock == -1) {
@@ -227,8 +275,11 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
 {
        struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
        ssize_t received;
+       int flags = 0;
        struct msghdr msg;
        struct iovec iov;
+       size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+       uint8_t buf[bufsize];
 
        iov = (struct iovec) {
                .iov_base = (void *)ctx->recv_buf,
@@ -238,18 +289,18 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
        msg = (struct msghdr) {
                .msg_iov = &iov,
                .msg_iovlen = 1,
-#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
-               .msg_control = NULL,
-               .msg_controllen = 0,
-#endif
        };
 
-       received = recvmsg(fd, &msg, 0);
+       msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+       flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+       received = recvmsg(fd, &msg, flags);
        if (received == -1) {
                if ((errno == EAGAIN) ||
-#ifdef EWOULDBLOCK
                    (errno == EWOULDBLOCK) ||
-#endif
                    (errno == EINTR) || (errno == ENOMEM)) {
                        /* Not really an error - just try again. */
                        return;
@@ -262,7 +313,27 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
                /* More than we expected, not for us */
                return;
        }
-       ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
+
+       {
+               size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+               int fds[num_fds];
+               int i;
+
+               msghdr_extract_fds(&msg, fds, num_fds);
+
+               for (i = 0; i < num_fds; i++) {
+                       int err;
+
+                       err = prepare_socket_cloexec(fds[i]);
+                       if (err != 0) {
+                               close_fd_array(fds, num_fds);
+                               num_fds = 0;
+                       }
+               }
+
+               ctx->recv_callback(ctx, ctx->recv_buf, received,
+                                  fds, num_fds, ctx->private_data);
+       }
 }
 
 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
@@ -276,18 +347,18 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
                return 0;
        }
 
-       ret = pthreadpool_init(0, &ctx->send_pool);
+       ret = pthreadpool_pipe_init(0, &ctx->send_pool);
        if (ret != 0) {
                return ret;
        }
 
-       signalfd = pthreadpool_signal_fd(ctx->send_pool);
+       signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
 
        ctx->pool_read_watch = ctx->ev_funcs->watch_new(
                ctx->ev_funcs, signalfd, POLLIN,
                unix_dgram_job_finished, ctx);
        if (ctx->pool_read_watch == NULL) {
-               pthreadpool_destroy(ctx->send_pool);
+               pthreadpool_pipe_destroy(ctx->send_pool);
                ctx->send_pool = NULL;
                return ENOMEM;
        }
@@ -295,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
        return 0;
 }
 
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
 static int unix_dgram_send_queue_init(
        struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
        struct unix_dgram_send_queue **result)
@@ -311,6 +384,7 @@ static int unix_dgram_send_queue_init(
        }
        q->ctx = ctx;
        q->msgs = NULL;
+       q->timeout = NULL;
        memcpy(q->path, dst->sun_path, pathlen);
 
        q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
@@ -319,7 +393,7 @@ static int unix_dgram_send_queue_init(
                goto fail_free;
        }
 
-       err = prepare_socket_cloexec(q->sock);
+       err = prepare_socket(q->sock);
        if (err != 0) {
                goto fail_close;
        }
@@ -340,6 +414,12 @@ static int unix_dgram_send_queue_init(
                goto fail_close;
        }
 
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               err = ENOMEM;
+               goto fail_close;
+       }
+
        DLIST_ADD(ctx->send_queues, q);
 
        *result = q;
@@ -360,70 +440,153 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
                struct unix_dgram_msg *msg;
                msg = q->msgs;
                DLIST_REMOVE(q->msgs, msg);
+               close_fd_array_dgram_msg(msg);
                free(msg);
        }
        close(q->sock);
        DLIST_REMOVE(ctx->send_queues, q);
+       ctx->ev_funcs->timeout_free(q->timeout);
        free(q);
 }
 
-static struct unix_dgram_send_queue *find_send_queue(
-       struct unix_dgram_ctx *ctx, const char *dst_sock)
+static void unix_dgram_sendq_scheduled_free_handler(
+       struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+       struct unix_dgram_ctx *ctx = q->ctx;
+       struct timeval timeout;
+
+       if (q->timeout != NULL) {
+               return 0;
+       }
+
+       GetTimeOfDay(&timeout);
+       timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+       q->timeout = ctx->ev_funcs->timeout_new(
+               ctx->ev_funcs,
+               timeout,
+               unix_dgram_sendq_scheduled_free_handler,
+               q);
+       if (q->timeout == NULL) {
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+                                                   void *private_data)
+{
+       struct unix_dgram_send_queue *q = private_data;
+       int ret;
+
+       q->ctx->ev_funcs->timeout_free(q->timeout);
+       q->timeout = NULL;
+
+       if (q->msgs == NULL) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+}
+
+static int find_send_queue(struct unix_dgram_ctx *ctx,
+                          const struct sockaddr_un *dst,
+                          struct unix_dgram_send_queue **ps)
 {
        struct unix_dgram_send_queue *s;
+       int ret;
 
        for (s = ctx->send_queues; s != NULL; s = s->next) {
-               if (strcmp(s->path, dst_sock) == 0) {
-                       return s;
+               if (strcmp(s->path, dst->sun_path) == 0) {
+                       *ps = s;
+                       return 0;
                }
        }
-       return NULL;
+       ret = unix_dgram_send_queue_init(ctx, dst, &s);
+       if (ret != 0) {
+               return ret;
+       }
+       *ps = s;
+       return 0;
 }
 
 static int queue_msg(struct unix_dgram_send_queue *q,
-                    const struct iovec *iov, int iovlen)
+                    const struct iovec *iov, int iovcnt,
+                    const int *fds, size_t num_fds)
 {
        struct unix_dgram_msg *msg;
-       ssize_t buflen;
-       size_t msglen;
-       int i;
+       struct msghdr_buf *hdr;
+       size_t msglen, needed;
+       ssize_t msghdrlen;
+       int fds_copy[MIN(num_fds, INT8_MAX)];
+       int i, ret;
+
+       for (i=0; i<num_fds; i++) {
+               fds_copy[i] = -1;
+       }
 
-       buflen = iov_buflen(iov, iovlen);
-       if (buflen == -1) {
-               return EINVAL;
+       for (i = 0; i < num_fds; i++) {
+               fds_copy[i] = dup(fds[i]);
+               if (fds_copy[i] == -1) {
+                       ret = errno;
+                       goto fail;
+               }
        }
 
-       msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
-       if ((msglen < buflen) ||
-           (msglen < offsetof(struct unix_dgram_msg, buf))) {
-               /* overflow */
-               return EINVAL;
+       msglen = unix_dgram_msg_size();
+
+       msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
+                               fds_copy, num_fds);
+       if (msghdrlen == -1) {
+               ret = EMSGSIZE;
+               goto fail;
        }
 
-       msg = malloc(msglen);
-       if (msg == NULL) {
-               return ENOMEM;
+       needed = msglen + msghdrlen;
+       if (needed < msglen) {
+               ret = EMSGSIZE;
+               goto fail;
        }
-       msg->buflen = buflen;
-       msg->sock = q->sock;
 
-       buflen = 0;
-       for (i=0; i<iovlen; i++) {
-               memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
-               buflen += iov[i].iov_len;
+       msg = malloc(needed);
+       if (msg == NULL) {
+               ret = ENOMEM;
+               goto fail;
        }
+       hdr = unix_dgram_msghdr(msg);
+
+       msg->sock = q->sock;
+       msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
+                   fds_copy, num_fds);
 
-       DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
+       DLIST_ADD_END(q->msgs, msg);
        return 0;
+fail:
+       close_fd_array(fds_copy, num_fds);
+       return ret;
 }
 
 static void unix_dgram_send_job(void *private_data)
 {
-       struct unix_dgram_msg *msg = private_data;
+       struct unix_dgram_msg *dmsg = private_data;
 
        do {
-               msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
-       } while ((msg->sent == -1) && (errno == EINTR));
+               struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
+               struct msghdr *msg = msghdr_buf_msghdr(hdr);
+               dmsg->sent = sendmsg(dmsg->sock, msg, 0);
+       } while ((dmsg->sent == -1) && (errno == EINTR));
+
+       if (dmsg->sent == -1) {
+               dmsg->sys_errno = errno;
+       }
 }
 
 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
@@ -434,7 +597,7 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        struct unix_dgram_msg *msg;
        int ret, job;
 
-       ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+       ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
        if (ret != 1) {
                return;
        }
@@ -452,73 +615,135 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
 
        msg = q->msgs;
        DLIST_REMOVE(q->msgs, msg);
+       close_fd_array_dgram_msg(msg);
        free(msg);
 
        if (q->msgs != NULL) {
-               ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                         unix_dgram_send_job, q->msgs);
-               if (ret == 0) {
+               ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                              unix_dgram_send_job, q->msgs);
+               if (ret != 0) {
+                       unix_dgram_send_queue_free(q);
                        return;
                }
+               return;
        }
 
-       unix_dgram_send_queue_free(q);
+       ret = prepare_socket_nonblock(q->sock, true);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+       }
 }
 
 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                           const struct sockaddr_un *dst,
-                          const struct iovec *iov, int iovlen)
+                          const struct iovec *iov, int iovlen,
+                          const int *fds, size_t num_fds)
 {
        struct unix_dgram_send_queue *q;
        struct msghdr msg;
+       ssize_t fdlen;
        int ret;
+       int i;
 
-       /*
-        * To preserve message ordering, we have to queue a message when
-        * others are waiting in line already.
-        */
-       q = find_send_queue(ctx, dst->sun_path);
-       if (q != NULL) {
-               return queue_msg(q, iov, iovlen);
+       if (num_fds > INT8_MAX) {
+               return EINVAL;
+       }
+
+#if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
+       if (num_fds > 0) {
+               return ENOSYS;
+       }
+#endif
+
+       for (i = 0; i < num_fds; i++) {
+               /*
+                * Make sure we only allow fd passing
+                * for communication channels,
+                * e.g. sockets, pipes, fifos, ...
+                */
+               ret = lseek(fds[i], 0, SEEK_CUR);
+               if (ret == -1 && errno == ESPIPE) {
+                       /* ok */
+                       continue;
+               }
+
+               /*
+                * Reject the message as we may need to call dup(),
+                * if we queue the message.
+                *
+                * That might result in unexpected behavior for the caller
+                * for files and broken posix locking.
+                */
+               return EINVAL;
+       }
+
+       ret = find_send_queue(ctx, dst, &q);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (q->msgs) {
+               /*
+                * To preserve message ordering, we have to queue a
+                * message when others are waiting in line already.
+                */
+               return queue_msg(q, iov, iovlen, fds, num_fds);
        }
 
        /*
         * Try a cheap nonblocking send
         */
 
-       msg.msg_name = discard_const_p(struct sockaddr_un, dst);
-       msg.msg_namelen = sizeof(*dst);
-       msg.msg_iov = discard_const_p(struct iovec, iov);
-       msg.msg_iovlen = iovlen;
-#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
-       msg.msg_control = NULL;
-       msg.msg_controllen = 0;
-#endif
-       msg.msg_flags = 0;
+       msg = (struct msghdr) {
+               .msg_iov = discard_const_p(struct iovec, iov),
+               .msg_iovlen = iovlen
+       };
+
+       fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+       if (fdlen == -1) {
+               return EINVAL;
+       }
+
+       {
+               uint8_t buf[fdlen];
+               msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+               ret = sendmsg(q->sock, &msg, 0);
+       }
 
-       ret = sendmsg(ctx->sock, &msg, 0);
        if (ret >= 0) {
                return 0;
        }
-#ifdef EWOULDBLOCK
-       if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
-#else
-       if ((errno != EAGAIN) && (errno != EINTR)) {
+       if ((errno != EWOULDBLOCK) &&
+           (errno != EAGAIN) &&
+#ifdef ENOBUFS
+           /* FreeBSD can give this for large messages */
+           (errno != ENOBUFS) &&
 #endif
+           (errno != EINTR)) {
                return errno;
        }
 
-       ret = unix_dgram_send_queue_init(ctx, dst, &q);
+       ret = queue_msg(q, iov, iovlen, fds, num_fds);
        if (ret != 0) {
+               unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = queue_msg(q, iov, iovlen);
+
+       /*
+        * While sending the messages via the pthreadpool, we set the
+        * socket back to blocking mode. When the sendqueue becomes
+        * empty and we could attempt direct sends again, the
+        * finished-jobs-handler of the pthreadpool will set it back
+        * to non-blocking.
+        */
+       ret = prepare_socket_nonblock(q->sock, false);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                 unix_dgram_send_job, q->msgs);
+       ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                      unix_dgram_send_job, q->msgs);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
@@ -533,12 +758,20 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
 
 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
 {
-       if (ctx->send_queues != NULL) {
-               return EBUSY;
+       struct unix_dgram_send_queue *q;
+
+       for (q = ctx->send_queues; q != NULL;) {
+               struct unix_dgram_send_queue *q_next = q->next;
+
+               if (q->msgs != NULL) {
+                       return EBUSY;
+               }
+               unix_dgram_send_queue_free(q);
+               q = q_next;
        }
 
        if (ctx->send_pool != NULL) {
-               int ret = pthreadpool_destroy(ctx->send_pool);
+               int ret = pthreadpool_pipe_destroy(ctx->send_pool);
                if (ret != 0) {
                        return ret;
                }
@@ -547,13 +780,13 @@ static int unix_dgram_free(struct unix_dgram_ctx *ctx)
 
        ctx->ev_funcs->watch_free(ctx->sock_read_watch);
 
+       close(ctx->sock);
        if (getpid() == ctx->created_pid) {
                /* If we created it, unlink. Otherwise someone else might
                 * still have it open */
                unlink(ctx->path);
        }
 
-       close(ctx->sock);
        free(ctx->recv_buf);
        free(ctx);
        return 0;
@@ -593,27 +826,29 @@ struct unix_msg_ctx {
 
        void (*recv_callback)(struct unix_msg_ctx *ctx,
                              uint8_t *msg, size_t msg_len,
+                             int *fds, size_t num_fds,
                              void *private_data);
        void *private_data;
 
        struct unix_msg *msgs;
 };
 
-static void unix_msg_recv(struct unix_dgram_ctx *ctx,
-                         uint8_t *msg, size_t msg_len,
+static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
+                         uint8_t *buf, size_t buflen,
+                         int *fds, size_t num_fds,
                          void *private_data);
 
-int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
-                 size_t fragment_len, uint64_t cookie,
+int unix_msg_init(const struct sockaddr_un *addr,
+                 const struct poll_funcs *ev_funcs,
+                 size_t fragment_len,
                  void (*recv_callback)(struct unix_msg_ctx *ctx,
                                        uint8_t *msg, size_t msg_len,
+                                       int *fds, size_t num_fds,
                                        void *private_data),
                  void *private_data,
                  struct unix_msg_ctx **result)
 {
        struct unix_msg_ctx *ctx;
-       struct sockaddr_un addr;
-       struct sockaddr_un *paddr = NULL;
        int ret;
 
        ctx = malloc(sizeof(*ctx));
@@ -621,52 +856,34 @@ int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
                return ENOMEM;
        }
 
-       if (path != NULL) {
-               size_t pathlen = strlen(path)+1;
-
-               if (pathlen > sizeof(addr.sun_path)) {
-                       return ENAMETOOLONG;
-               }
-               addr = (struct sockaddr_un) { .sun_family = AF_UNIX };
-               memcpy(addr.sun_path, path, pathlen);
-               paddr = &addr;
-       }
+       *ctx = (struct unix_msg_ctx) {
+               .fragment_len = fragment_len,
+               .cookie = 1,
+               .recv_callback = recv_callback,
+               .private_data = private_data
+       };
 
-       ret = unix_dgram_init(paddr, fragment_len, ev_funcs,
+       ret = unix_dgram_init(addr, fragment_len, ev_funcs,
                              unix_msg_recv, ctx, &ctx->dgram);
        if (ret != 0) {
                free(ctx);
                return ret;
        }
 
-       ctx->fragment_len = fragment_len;
-       ctx->cookie = cookie;
-       ctx->recv_callback = recv_callback;
-       ctx->private_data = private_data;
-       ctx->msgs = NULL;
-
        *result = ctx;
        return 0;
 }
 
-int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
-                 const struct iovec *iov, int iovlen)
+int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
+                 const struct iovec *iov, int iovlen,
+                 const int *fds, size_t num_fds)
 {
        ssize_t msglen;
        size_t sent;
        int ret = 0;
-       struct iovec *iov_copy;
+       struct iovec iov_copy[iovlen+2];
        struct unix_msg_hdr hdr;
        struct iovec src_iov;
-       struct sockaddr_un dst;
-       size_t dst_len;
-
-       dst_len = strlen(dst_sock);
-       if (dst_len >= sizeof(dst.sun_path)) {
-               return ENAMETOOLONG;
-       }
-       dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
-       memcpy(dst.sun_path, dst_sock, dst_len);
 
        if (iovlen < 0) {
                return EINVAL;
@@ -677,28 +894,30 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
                return EINVAL;
        }
 
+       if (num_fds > INT8_MAX) {
+               return EINVAL;
+       }
+
        if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
-               struct iovec tmp_iov[iovlen+1];
                uint64_t cookie = 0;
 
-               tmp_iov[0].iov_base = &cookie;
-               tmp_iov[0].iov_len = sizeof(cookie);
+               iov_copy[0].iov_base = &cookie;
+               iov_copy[0].iov_len = sizeof(cookie);
                if (iovlen > 0) {
-                       memcpy(&tmp_iov[1], iov,
+                       memcpy(&iov_copy[1], iov,
                               sizeof(struct iovec) * iovlen);
                }
 
-               return unix_dgram_send(ctx->dgram, &dst, tmp_iov, iovlen+1);
+               return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
+                                      fds, num_fds);
        }
 
-       hdr.msglen = msglen;
-       hdr.pid = getpid();
-       hdr.sock = unix_dgram_sock(ctx->dgram);
+       hdr = (struct unix_msg_hdr) {
+               .msglen = msglen,
+               .pid = getpid(),
+               .sock = unix_dgram_sock(ctx->dgram)
+       };
 
-       iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
-       if (iov_copy == NULL) {
-               return ENOMEM;
-       }
        iov_copy[0].iov_base = &ctx->cookie;
        iov_copy[0].iov_len = sizeof(ctx->cookie);
        iov_copy[1].iov_base = &hdr;
@@ -746,14 +965,24 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
                }
                sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
 
-               ret = unix_dgram_send(ctx->dgram, &dst, iov_copy, iov_index);
+               /*
+                * only the last fragment should pass the fd array.
+                * That simplifies the receiver a lot.
+                */
+               if (sent < msglen) {
+                       ret = unix_dgram_send(ctx->dgram, dst,
+                                             iov_copy, iov_index,
+                                             NULL, 0);
+               } else {
+                       ret = unix_dgram_send(ctx->dgram, dst,
+                                             iov_copy, iov_index,
+                                             fds, num_fds);
+               }
                if (ret != 0) {
                        break;
                }
        }
 
-       free(iov_copy);
-
        ctx->cookie += 1;
        if (ctx->cookie == 0) {
                ctx->cookie += 1;
@@ -764,6 +993,7 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
 
 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
                          uint8_t *buf, size_t buflen,
+                         int *fds, size_t num_fds,
                          void *private_data)
 {
        struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
@@ -773,20 +1003,22 @@ static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
        uint64_t cookie;
 
        if (buflen < sizeof(cookie)) {
-               return;
+               goto close_fds;
        }
+
        memcpy(&cookie, buf, sizeof(cookie));
 
        buf += sizeof(cookie);
        buflen -= sizeof(cookie);
 
        if (cookie == 0) {
-               ctx->recv_callback(ctx, buf, buflen, ctx->private_data);
+               ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
+                                  ctx->private_data);
                return;
        }
 
        if (buflen < sizeof(hdr)) {
-               return;
+               goto close_fds;
        }
        memcpy(&hdr, buf, sizeof(hdr));
 
@@ -809,31 +1041,37 @@ static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
        if (msg == NULL) {
                msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
                if (msg == NULL) {
-                       return;
+                       goto close_fds;
                }
-               msg->msglen = hdr.msglen;
-               msg->received = 0;
-               msg->sender_pid = hdr.pid;
-               msg->sender_sock = hdr.sock;
-               msg->cookie = cookie;
+               *msg = (struct unix_msg) {
+                       .msglen = hdr.msglen,
+                       .sender_pid = hdr.pid,
+                       .sender_sock = hdr.sock,
+                       .cookie = cookie
+               };
                DLIST_ADD(ctx->msgs, msg);
        }
 
        space = msg->msglen - msg->received;
        if (buflen > space) {
-               return;
+               goto close_fds;
        }
 
        memcpy(msg->buf + msg->received, buf, buflen);
        msg->received += buflen;
 
        if (msg->received < msg->msglen) {
-               return;
+               goto close_fds;
        }
 
        DLIST_REMOVE(ctx->msgs, msg);
-       ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
+       ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
+                          ctx->private_data);
        free(msg);
+       return;
+
+close_fds:
+       close_fd_array(fds, num_fds);
 }
 
 int unix_msg_free(struct unix_msg_ctx *ctx)
@@ -854,21 +1092,3 @@ int unix_msg_free(struct unix_msg_ctx *ctx)
        free(ctx);
        return 0;
 }
-
-static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
-{
-       size_t buflen = 0;
-       int i;
-
-       for (i=0; i<iovlen; i++) {
-               size_t thislen = iov[i].iov_len;
-               size_t tmp = buflen + thislen;
-
-               if ((tmp < buflen) || (tmp < thislen)) {
-                       /* overflow */
-                       return -1;
-               }
-               buflen = tmp;
-       }
-       return buflen;
-}