#include "system/select.h"
#include "system/time.h"
#include "system/network.h"
-#include "dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "lib/util/dlinklist.h"
+#include "pthreadpool/pthreadpool_pipe.h"
+#include "lib/util/iov_buf.h"
+#include "lib/util/msghdr.h"
#include <fcntl.h>
+#include "lib/util/time.h"
/*
* This file implements two abstractions: The "unix_dgram" functions implement
int sock;
ssize_t sent;
int sys_errno;
- size_t buflen;
- uint8_t buf[1];
};
struct unix_dgram_send_queue {
struct unix_dgram_ctx *ctx;
int sock;
struct unix_dgram_msg *msgs;
- char path[1];
+ struct poll_timeout *timeout;
+ char path[];
};
struct unix_dgram_ctx {
void (*recv_callback)(struct unix_dgram_ctx *ctx,
uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds,
void *private_data);
void *private_data;
struct poll_watch *sock_read_watch;
struct unix_dgram_send_queue *send_queues;
- struct pthreadpool *send_pool;
+ struct pthreadpool_pipe *send_pool;
struct poll_watch *pool_read_watch;
uint8_t *recv_buf;
- char path[1];
+ char path[];
};
-static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
void *private_data);
/* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock)
+static int prepare_socket_nonblock(int sock, bool nonblock)
{
int flags;
#ifdef O_NONBLOCK
if (flags == -1) {
return errno;
}
- flags |= FLAG_TO_SET;
+ if (nonblock) {
+ flags |= FLAG_TO_SET;
+ } else {
+ flags &= ~FLAG_TO_SET;
+ }
if (fcntl(sock, F_SETFL, flags) == -1) {
return errno;
}
/* Set socket non blocking and close on exec. */
static int prepare_socket(int sock)
{
- int ret = prepare_socket_nonblock(sock);
+ int ret = prepare_socket_nonblock(sock, true);
if (ret) {
return ret;
return prepare_socket_cloexec(sock);
}
+static size_t unix_dgram_msg_size(void)
+{
+ size_t msgsize = sizeof(struct unix_dgram_msg);
+ msgsize = (msgsize + 15) & ~15; /* align to 16 */
+ return msgsize;
+}
+
+static struct msghdr_buf *unix_dgram_msghdr(struct unix_dgram_msg *msg)
+{
+ /*
+ * Not portable in C99, but "msg" is aligned and so is
+ * unix_dgram_msg_size()
+ */
+ return (struct msghdr_buf *)(((char *)msg) + unix_dgram_msg_size());
+}
+
+static void close_fd_array(int *fds, size_t num_fds)
+{
+ size_t i;
+
+ for (i = 0; i < num_fds; i++) {
+ if (fds[i] == -1) {
+ continue;
+ }
+
+ close(fds[i]);
+ fds[i] = -1;
+ }
+}
+
+static void close_fd_array_dgram_msg(struct unix_dgram_msg *dmsg)
+{
+ struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
+ struct msghdr *msg = msghdr_buf_msghdr(hdr);
+ size_t num_fds = msghdr_extract_fds(msg, NULL, 0);
+ int fds[num_fds];
+
+ msghdr_extract_fds(msg, fds, num_fds);
+
+ close_fd_array(fds, num_fds);
+}
+
static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
const struct poll_funcs *ev_funcs,
void (*recv_callback)(struct unix_dgram_ctx *ctx,
uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds,
void *private_data),
void *private_data,
struct unix_dgram_ctx **result)
ctx->path[0] = '\0';
}
+ *ctx = (struct unix_dgram_ctx) {
+ .max_msg = max_msg,
+ .ev_funcs = ev_funcs,
+ .recv_callback = recv_callback,
+ .private_data = private_data,
+ .created_pid = (pid_t)-1
+ };
+
ctx->recv_buf = malloc(max_msg);
if (ctx->recv_buf == NULL) {
free(ctx);
return ENOMEM;
}
- ctx->max_msg = max_msg;
- ctx->ev_funcs = ev_funcs;
- ctx->recv_callback = recv_callback;
- ctx->private_data = private_data;
- ctx->sock_read_watch = NULL;
- ctx->send_pool = NULL;
- ctx->pool_read_watch = NULL;
- ctx->send_queues = NULL;
- ctx->created_pid = (pid_t)-1;
ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
if (ctx->sock == -1) {
{
struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
ssize_t received;
+ int flags = 0;
struct msghdr msg;
struct iovec iov;
+ size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, INT8_MAX);
+ uint8_t buf[bufsize];
iov = (struct iovec) {
.iov_base = (void *)ctx->recv_buf,
msg = (struct msghdr) {
.msg_iov = &iov,
.msg_iovlen = 1,
-#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
- .msg_control = NULL,
- .msg_controllen = 0,
-#endif
};
- received = recvmsg(fd, &msg, 0);
+ msghdr_prep_recv_fds(&msg, buf, bufsize, INT8_MAX);
+
+#ifdef MSG_CMSG_CLOEXEC
+ flags |= MSG_CMSG_CLOEXEC;
+#endif
+
+ received = recvmsg(fd, &msg, flags);
if (received == -1) {
if ((errno == EAGAIN) ||
-#ifdef EWOULDBLOCK
(errno == EWOULDBLOCK) ||
-#endif
(errno == EINTR) || (errno == ENOMEM)) {
/* Not really an error - just try again. */
return;
/* More than we expected, not for us */
return;
}
- ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
+
+ {
+ size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
+ int fds[num_fds];
+ int i;
+
+ msghdr_extract_fds(&msg, fds, num_fds);
+
+ for (i = 0; i < num_fds; i++) {
+ int err;
+
+ err = prepare_socket_cloexec(fds[i]);
+ if (err != 0) {
+ close_fd_array(fds, num_fds);
+ num_fds = 0;
+ }
+ }
+
+ ctx->recv_callback(ctx, ctx->recv_buf, received,
+ fds, num_fds, ctx->private_data);
+ }
}
static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
return 0;
}
- ret = pthreadpool_init(0, &ctx->send_pool);
+ ret = pthreadpool_pipe_init(0, &ctx->send_pool);
if (ret != 0) {
return ret;
}
- signalfd = pthreadpool_signal_fd(ctx->send_pool);
+ signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
ctx->pool_read_watch = ctx->ev_funcs->watch_new(
ctx->ev_funcs, signalfd, POLLIN,
unix_dgram_job_finished, ctx);
if (ctx->pool_read_watch == NULL) {
- pthreadpool_destroy(ctx->send_pool);
+ pthreadpool_pipe_destroy(ctx->send_pool);
ctx->send_pool = NULL;
return ENOMEM;
}
return 0;
}
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
static int unix_dgram_send_queue_init(
struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
struct unix_dgram_send_queue **result)
}
q->ctx = ctx;
q->msgs = NULL;
+ q->timeout = NULL;
memcpy(q->path, dst->sun_path, pathlen);
q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
goto fail_free;
}
- err = prepare_socket_cloexec(q->sock);
+ err = prepare_socket(q->sock);
if (err != 0) {
goto fail_close;
}
goto fail_close;
}
+ ret = unix_dgram_sendq_schedule_free(q);
+ if (ret != 0) {
+ err = ENOMEM;
+ goto fail_close;
+ }
+
DLIST_ADD(ctx->send_queues, q);
*result = q;
struct unix_dgram_msg *msg;
msg = q->msgs;
DLIST_REMOVE(q->msgs, msg);
+ close_fd_array_dgram_msg(msg);
free(msg);
}
close(q->sock);
DLIST_REMOVE(ctx->send_queues, q);
+ ctx->ev_funcs->timeout_free(q->timeout);
free(q);
}
-static struct unix_dgram_send_queue *find_send_queue(
- struct unix_dgram_ctx *ctx, const char *dst_sock)
+static void unix_dgram_sendq_scheduled_free_handler(
+ struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+ struct unix_dgram_ctx *ctx = q->ctx;
+ struct timeval timeout;
+
+ if (q->timeout != NULL) {
+ return 0;
+ }
+
+ GetTimeOfDay(&timeout);
+ timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+ q->timeout = ctx->ev_funcs->timeout_new(
+ ctx->ev_funcs,
+ timeout,
+ unix_dgram_sendq_scheduled_free_handler,
+ q);
+ if (q->timeout == NULL) {
+ return ENOMEM;
+ }
+
+ return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+ void *private_data)
+{
+ struct unix_dgram_send_queue *q = private_data;
+ int ret;
+
+ q->ctx->ev_funcs->timeout_free(q->timeout);
+ q->timeout = NULL;
+
+ if (q->msgs == NULL) {
+ unix_dgram_send_queue_free(q);
+ return;
+ }
+
+ ret = unix_dgram_sendq_schedule_free(q);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
+ return;
+ }
+}
+
+static int find_send_queue(struct unix_dgram_ctx *ctx,
+ const struct sockaddr_un *dst,
+ struct unix_dgram_send_queue **ps)
{
struct unix_dgram_send_queue *s;
+ int ret;
for (s = ctx->send_queues; s != NULL; s = s->next) {
- if (strcmp(s->path, dst_sock) == 0) {
- return s;
+ if (strcmp(s->path, dst->sun_path) == 0) {
+ *ps = s;
+ return 0;
}
}
- return NULL;
+ ret = unix_dgram_send_queue_init(ctx, dst, &s);
+ if (ret != 0) {
+ return ret;
+ }
+ *ps = s;
+ return 0;
}
static int queue_msg(struct unix_dgram_send_queue *q,
- const struct iovec *iov, int iovlen)
+ const struct iovec *iov, int iovcnt,
+ const int *fds, size_t num_fds)
{
struct unix_dgram_msg *msg;
- ssize_t buflen;
- size_t msglen;
- int i;
+ struct msghdr_buf *hdr;
+ size_t msglen, needed;
+ ssize_t msghdrlen;
+ int fds_copy[MIN(num_fds, INT8_MAX)];
+ int i, ret;
+
+ for (i=0; i<num_fds; i++) {
+ fds_copy[i] = -1;
+ }
- buflen = iov_buflen(iov, iovlen);
- if (buflen == -1) {
- return EINVAL;
+ for (i = 0; i < num_fds; i++) {
+ fds_copy[i] = dup(fds[i]);
+ if (fds_copy[i] == -1) {
+ ret = errno;
+ goto fail;
+ }
}
- msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
- if ((msglen < buflen) ||
- (msglen < offsetof(struct unix_dgram_msg, buf))) {
- /* overflow */
- return EINVAL;
+ msglen = unix_dgram_msg_size();
+
+ msghdrlen = msghdr_copy(NULL, 0, NULL, 0, iov, iovcnt,
+ fds_copy, num_fds);
+ if (msghdrlen == -1) {
+ ret = EMSGSIZE;
+ goto fail;
}
- msg = malloc(msglen);
- if (msg == NULL) {
- return ENOMEM;
+ needed = msglen + msghdrlen;
+ if (needed < msglen) {
+ ret = EMSGSIZE;
+ goto fail;
}
- msg->buflen = buflen;
- msg->sock = q->sock;
- buflen = 0;
- for (i=0; i<iovlen; i++) {
- memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
- buflen += iov[i].iov_len;
+ msg = malloc(needed);
+ if (msg == NULL) {
+ ret = ENOMEM;
+ goto fail;
}
+ hdr = unix_dgram_msghdr(msg);
+
+ msg->sock = q->sock;
+ msghdr_copy(hdr, msghdrlen, NULL, 0, iov, iovcnt,
+ fds_copy, num_fds);
- DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
+ DLIST_ADD_END(q->msgs, msg);
return 0;
+fail:
+ close_fd_array(fds_copy, num_fds);
+ return ret;
}
static void unix_dgram_send_job(void *private_data)
{
- struct unix_dgram_msg *msg = private_data;
+ struct unix_dgram_msg *dmsg = private_data;
do {
- msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
- } while ((msg->sent == -1) && (errno == EINTR));
+ struct msghdr_buf *hdr = unix_dgram_msghdr(dmsg);
+ struct msghdr *msg = msghdr_buf_msghdr(hdr);
+ dmsg->sent = sendmsg(dmsg->sock, msg, 0);
+ } while ((dmsg->sent == -1) && (errno == EINTR));
+
+ if (dmsg->sent == -1) {
+ dmsg->sys_errno = errno;
+ }
}
static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
struct unix_dgram_msg *msg;
int ret, job;
- ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+ ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
if (ret != 1) {
return;
}
msg = q->msgs;
DLIST_REMOVE(q->msgs, msg);
+ close_fd_array_dgram_msg(msg);
free(msg);
if (q->msgs != NULL) {
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
- if (ret == 0) {
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
return;
}
+ return;
}
- unix_dgram_send_queue_free(q);
+ ret = prepare_socket_nonblock(q->sock, true);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
+ }
}
static int unix_dgram_send(struct unix_dgram_ctx *ctx,
const struct sockaddr_un *dst,
- const struct iovec *iov, int iovlen)
+ const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds)
{
struct unix_dgram_send_queue *q;
struct msghdr msg;
+ ssize_t fdlen;
int ret;
+ int i;
- /*
- * To preserve message ordering, we have to queue a message when
- * others are waiting in line already.
- */
- q = find_send_queue(ctx, dst->sun_path);
- if (q != NULL) {
- return queue_msg(q, iov, iovlen);
+ if (num_fds > INT8_MAX) {
+ return EINVAL;
+ }
+
+#if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
+ if (num_fds > 0) {
+ return ENOSYS;
+ }
+#endif
+
+ for (i = 0; i < num_fds; i++) {
+ /*
+ * Make sure we only allow fd passing
+ * for communication channels,
+ * e.g. sockets, pipes, fifos, ...
+ */
+ ret = lseek(fds[i], 0, SEEK_CUR);
+ if (ret == -1 && errno == ESPIPE) {
+ /* ok */
+ continue;
+ }
+
+ /*
+ * Reject the message as we may need to call dup(),
+ * if we queue the message.
+ *
+ * That might result in unexpected behavior for the caller
+ * for files and broken posix locking.
+ */
+ return EINVAL;
+ }
+
+ ret = find_send_queue(ctx, dst, &q);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (q->msgs) {
+ /*
+ * To preserve message ordering, we have to queue a
+ * message when others are waiting in line already.
+ */
+ return queue_msg(q, iov, iovlen, fds, num_fds);
}
/*
* Try a cheap nonblocking send
*/
- msg.msg_name = discard_const_p(struct sockaddr_un, dst);
- msg.msg_namelen = sizeof(*dst);
- msg.msg_iov = discard_const_p(struct iovec, iov);
- msg.msg_iovlen = iovlen;
-#ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
- msg.msg_control = NULL;
- msg.msg_controllen = 0;
-#endif
- msg.msg_flags = 0;
+ msg = (struct msghdr) {
+ .msg_iov = discard_const_p(struct iovec, iov),
+ .msg_iovlen = iovlen
+ };
+
+ fdlen = msghdr_prep_fds(&msg, NULL, 0, fds, num_fds);
+ if (fdlen == -1) {
+ return EINVAL;
+ }
+
+ {
+ uint8_t buf[fdlen];
+ msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
+
+ ret = sendmsg(q->sock, &msg, 0);
+ }
- ret = sendmsg(ctx->sock, &msg, 0);
if (ret >= 0) {
return 0;
}
-#ifdef EWOULDBLOCK
- if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
-#else
- if ((errno != EAGAIN) && (errno != EINTR)) {
+ if ((errno != EWOULDBLOCK) &&
+ (errno != EAGAIN) &&
+#ifdef ENOBUFS
+ /* FreeBSD can give this for large messages */
+ (errno != ENOBUFS) &&
#endif
+ (errno != EINTR)) {
return errno;
}
- ret = unix_dgram_send_queue_init(ctx, dst, &q);
+ ret = queue_msg(q, iov, iovlen, fds, num_fds);
if (ret != 0) {
+ unix_dgram_send_queue_free(q);
return ret;
}
- ret = queue_msg(q, iov, iovlen);
+
+ /*
+ * While sending the messages via the pthreadpool, we set the
+ * socket back to blocking mode. When the sendqueue becomes
+ * empty and we could attempt direct sends again, the
+ * finished-jobs-handler of the pthreadpool will set it back
+ * to non-blocking.
+ */
+ ret = prepare_socket_nonblock(q->sock, false);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
}
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
static int unix_dgram_free(struct unix_dgram_ctx *ctx)
{
- if (ctx->send_queues != NULL) {
- return EBUSY;
+ struct unix_dgram_send_queue *q;
+
+ for (q = ctx->send_queues; q != NULL;) {
+ struct unix_dgram_send_queue *q_next = q->next;
+
+ if (q->msgs != NULL) {
+ return EBUSY;
+ }
+ unix_dgram_send_queue_free(q);
+ q = q_next;
}
if (ctx->send_pool != NULL) {
- int ret = pthreadpool_destroy(ctx->send_pool);
+ int ret = pthreadpool_pipe_destroy(ctx->send_pool);
if (ret != 0) {
return ret;
}
ctx->ev_funcs->watch_free(ctx->sock_read_watch);
+ close(ctx->sock);
if (getpid() == ctx->created_pid) {
/* If we created it, unlink. Otherwise someone else might
* still have it open */
unlink(ctx->path);
}
- close(ctx->sock);
free(ctx->recv_buf);
free(ctx);
return 0;
void (*recv_callback)(struct unix_msg_ctx *ctx,
uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds,
void *private_data);
void *private_data;
struct unix_msg *msgs;
};
-static void unix_msg_recv(struct unix_dgram_ctx *ctx,
- uint8_t *msg, size_t msg_len,
+static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
+ uint8_t *buf, size_t buflen,
+ int *fds, size_t num_fds,
void *private_data);
-int unix_msg_init(const char *path, const struct poll_funcs *ev_funcs,
- size_t fragment_len, uint64_t cookie,
+int unix_msg_init(const struct sockaddr_un *addr,
+ const struct poll_funcs *ev_funcs,
+ size_t fragment_len,
void (*recv_callback)(struct unix_msg_ctx *ctx,
uint8_t *msg, size_t msg_len,
+ int *fds, size_t num_fds,
void *private_data),
void *private_data,
struct unix_msg_ctx **result)
{
struct unix_msg_ctx *ctx;
- struct sockaddr_un addr;
- struct sockaddr_un *paddr = NULL;
int ret;
ctx = malloc(sizeof(*ctx));
return ENOMEM;
}
- if (path != NULL) {
- size_t pathlen = strlen(path)+1;
-
- if (pathlen > sizeof(addr.sun_path)) {
- return ENAMETOOLONG;
- }
- addr = (struct sockaddr_un) { .sun_family = AF_UNIX };
- memcpy(addr.sun_path, path, pathlen);
- paddr = &addr;
- }
+ *ctx = (struct unix_msg_ctx) {
+ .fragment_len = fragment_len,
+ .cookie = 1,
+ .recv_callback = recv_callback,
+ .private_data = private_data
+ };
- ret = unix_dgram_init(paddr, fragment_len, ev_funcs,
+ ret = unix_dgram_init(addr, fragment_len, ev_funcs,
unix_msg_recv, ctx, &ctx->dgram);
if (ret != 0) {
free(ctx);
return ret;
}
- ctx->fragment_len = fragment_len;
- ctx->cookie = cookie;
- ctx->recv_callback = recv_callback;
- ctx->private_data = private_data;
- ctx->msgs = NULL;
-
*result = ctx;
return 0;
}
-int unix_msg_send(struct unix_msg_ctx *ctx, const char *dst_sock,
- const struct iovec *iov, int iovlen)
+int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
+ const struct iovec *iov, int iovlen,
+ const int *fds, size_t num_fds)
{
ssize_t msglen;
size_t sent;
int ret = 0;
- struct iovec *iov_copy;
+ struct iovec iov_copy[iovlen+2];
struct unix_msg_hdr hdr;
struct iovec src_iov;
- struct sockaddr_un dst;
- size_t dst_len;
-
- dst_len = strlen(dst_sock);
- if (dst_len >= sizeof(dst.sun_path)) {
- return ENAMETOOLONG;
- }
- dst = (struct sockaddr_un) { .sun_family = AF_UNIX };
- memcpy(dst.sun_path, dst_sock, dst_len);
if (iovlen < 0) {
return EINVAL;
return EINVAL;
}
+ if (num_fds > INT8_MAX) {
+ return EINVAL;
+ }
+
if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
- struct iovec tmp_iov[iovlen+1];
uint64_t cookie = 0;
- tmp_iov[0].iov_base = &cookie;
- tmp_iov[0].iov_len = sizeof(cookie);
+ iov_copy[0].iov_base = &cookie;
+ iov_copy[0].iov_len = sizeof(cookie);
if (iovlen > 0) {
- memcpy(&tmp_iov[1], iov,
+ memcpy(&iov_copy[1], iov,
sizeof(struct iovec) * iovlen);
}
- return unix_dgram_send(ctx->dgram, &dst, tmp_iov, iovlen+1);
+ return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1,
+ fds, num_fds);
}
- hdr.msglen = msglen;
- hdr.pid = getpid();
- hdr.sock = unix_dgram_sock(ctx->dgram);
+ hdr = (struct unix_msg_hdr) {
+ .msglen = msglen,
+ .pid = getpid(),
+ .sock = unix_dgram_sock(ctx->dgram)
+ };
- iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
- if (iov_copy == NULL) {
- return ENOMEM;
- }
iov_copy[0].iov_base = &ctx->cookie;
iov_copy[0].iov_len = sizeof(ctx->cookie);
iov_copy[1].iov_base = &hdr;
}
sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
- ret = unix_dgram_send(ctx->dgram, &dst, iov_copy, iov_index);
+ /*
+ * only the last fragment should pass the fd array.
+ * That simplifies the receiver a lot.
+ */
+ if (sent < msglen) {
+ ret = unix_dgram_send(ctx->dgram, dst,
+ iov_copy, iov_index,
+ NULL, 0);
+ } else {
+ ret = unix_dgram_send(ctx->dgram, dst,
+ iov_copy, iov_index,
+ fds, num_fds);
+ }
if (ret != 0) {
break;
}
}
- free(iov_copy);
-
ctx->cookie += 1;
if (ctx->cookie == 0) {
ctx->cookie += 1;
static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
uint8_t *buf, size_t buflen,
+ int *fds, size_t num_fds,
void *private_data)
{
struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
uint64_t cookie;
if (buflen < sizeof(cookie)) {
- return;
+ goto close_fds;
}
+
memcpy(&cookie, buf, sizeof(cookie));
buf += sizeof(cookie);
buflen -= sizeof(cookie);
if (cookie == 0) {
- ctx->recv_callback(ctx, buf, buflen, ctx->private_data);
+ ctx->recv_callback(ctx, buf, buflen, fds, num_fds,
+ ctx->private_data);
return;
}
if (buflen < sizeof(hdr)) {
- return;
+ goto close_fds;
}
memcpy(&hdr, buf, sizeof(hdr));
if (msg == NULL) {
msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
if (msg == NULL) {
- return;
+ goto close_fds;
}
- msg->msglen = hdr.msglen;
- msg->received = 0;
- msg->sender_pid = hdr.pid;
- msg->sender_sock = hdr.sock;
- msg->cookie = cookie;
+ *msg = (struct unix_msg) {
+ .msglen = hdr.msglen,
+ .sender_pid = hdr.pid,
+ .sender_sock = hdr.sock,
+ .cookie = cookie
+ };
DLIST_ADD(ctx->msgs, msg);
}
space = msg->msglen - msg->received;
if (buflen > space) {
- return;
+ goto close_fds;
}
memcpy(msg->buf + msg->received, buf, buflen);
msg->received += buflen;
if (msg->received < msg->msglen) {
- return;
+ goto close_fds;
}
DLIST_REMOVE(ctx->msgs, msg);
- ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
+ ctx->recv_callback(ctx, msg->buf, msg->msglen, fds, num_fds,
+ ctx->private_data);
free(msg);
+ return;
+
+close_fds:
+ close_fd_array(fds, num_fds);
}
int unix_msg_free(struct unix_msg_ctx *ctx)
free(ctx);
return 0;
}
-
-static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
-{
- size_t buflen = 0;
- int i;
-
- for (i=0; i<iovlen; i++) {
- size_t thislen = iov[i].iov_len;
- size_t tmp = buflen + thislen;
-
- if ((tmp < buflen) || (tmp < thislen)) {
- /* overflow */
- return -1;
- }
- buflen = tmp;
- }
- return buflen;
-}