From 95f3d9bb497c9e18bbdead25f6abf485014ba769 Mon Sep 17 00:00:00 2001 From: Ralph Boehme Date: Fri, 19 Aug 2016 09:22:54 +0200 Subject: [PATCH] unix_msg: always create a send queue for a peer Previously, we only created a send queue for a peer if the initial send to the non-blocking non-connected socket reported EWOULDBOCK (because the channel was full). With this change, we now always create a send queue and use a connected, non-blocking datagram socket from the beginning. Initially, the socket of the send queue is set to non-blocking mode and we attempt a direct send via sendmsg(). If that returns EWOULDBOCK, we set the send queue to blocking mode and let the threadpool handle the IO. When a send queue becomes empty, we set the send queue socket back to non-blocking. Signed-off-by: Ralph Boehme Reviewed-by: Jeremy Allison --- source3/lib/unix_msg/unix_msg.c | 58 +++++++++++++++++++++++++-------- 1 file changed, 44 insertions(+), 14 deletions(-) diff --git a/source3/lib/unix_msg/unix_msg.c b/source3/lib/unix_msg/unix_msg.c index 73d97404af24..8645c97a2de7 100644 --- a/source3/lib/unix_msg/unix_msg.c +++ b/source3/lib/unix_msg/unix_msg.c @@ -393,7 +393,7 @@ static int unix_dgram_send_queue_init( goto fail_free; } - err = prepare_socket_cloexec(q->sock); + err = prepare_socket(q->sock); if (err != 0) { goto fail_close; } @@ -503,6 +503,7 @@ static int find_send_queue(struct unix_dgram_ctx *ctx, struct unix_dgram_send_queue **ps) { struct unix_dgram_send_queue *s; + int ret; for (s = ctx->send_queues; s != NULL; s = s->next) { if (strcmp(s->path, dst->sun_path) == 0) { @@ -510,7 +511,12 @@ static int find_send_queue(struct unix_dgram_ctx *ctx, return 0; } } - return ENOENT; + ret = unix_dgram_send_queue_init(ctx, dst, &s); + if (ret != 0) { + return ret; + } + *ps = s; + return 0; } static int queue_msg(struct unix_dgram_send_queue *q, @@ -622,6 +628,11 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events, } return; } + + ret = prepare_socket_nonblock(q->sock, true); + if (ret != 0) { + unix_dgram_send_queue_free(q); + } } static int unix_dgram_send(struct unix_dgram_ctx *ctx, @@ -667,12 +678,16 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx, return EINVAL; } - /* - * To preserve message ordering, we have to queue a message when - * others are waiting in line already. - */ ret = find_send_queue(ctx, dst, &q); - if (ret == 0) { + if (ret != 0) { + return ret; + } + + if (q->msgs) { + /* + * To preserve message ordering, we have to queue a + * message when others are waiting in line already. + */ return queue_msg(q, iov, iovlen, fds, num_fds); } @@ -681,8 +696,6 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx, */ msg = (struct msghdr) { - .msg_name = discard_const_p(struct sockaddr_un, dst), - .msg_namelen = sizeof(*dst), .msg_iov = discard_const_p(struct iovec, iov), .msg_iovlen = iovlen }; @@ -696,7 +709,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx, uint8_t buf[fdlen]; msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds); - ret = sendmsg(ctx->sock, &msg, 0); + ret = sendmsg(q->sock, &msg, 0); } if (ret >= 0) { @@ -712,11 +725,20 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx, return errno; } - ret = unix_dgram_send_queue_init(ctx, dst, &q); + ret = queue_msg(q, iov, iovlen, fds, num_fds); if (ret != 0) { + unix_dgram_send_queue_free(q); return ret; } - ret = queue_msg(q, iov, iovlen, fds, num_fds); + + /* + * While sending the messages via the pthreadpool, we set the + * socket back to blocking mode. When the sendqueue becomes + * empty and we could attempt direct sends again, the + * finished-jobs-handler of the pthreadpool will set it back + * to non-blocking. + */ + ret = prepare_socket_nonblock(q->sock, false); if (ret != 0) { unix_dgram_send_queue_free(q); return ret; @@ -737,8 +759,16 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx) static int unix_dgram_free(struct unix_dgram_ctx *ctx) { - if (ctx->send_queues != NULL) { - return EBUSY; + struct unix_dgram_send_queue *q; + + for (q = ctx->send_queues; q != NULL;) { + struct unix_dgram_send_queue *q_next = q->next; + + if (q->msgs != NULL) { + return EBUSY; + } + unix_dgram_send_queue_free(q); + q = q_next; } if (ctx->send_pool != NULL) { -- 2.34.1