unix_msg: always create a send queue for a peer
authorRalph Boehme <slow@samba.org>
Fri, 19 Aug 2016 07:22:54 +0000 (09:22 +0200)
committerJeremy Allison <jra@samba.org>
Mon, 12 Sep 2016 22:19:27 +0000 (00:19 +0200)
Previously, we only created a send queue for a peer if the initial send
to the non-blocking non-connected socket reported EWOULDBOCK (because
the channel was full).

With this change, we now always create a send queue and use a connected,
non-blocking datagram socket from the beginning.

Initially, the socket of the send queue is set to non-blocking mode and
we attempt a direct send via sendmsg(). If that returns EWOULDBOCK, we
set the send queue to blocking mode and let the threadpool handle the
IO.

When a send queue becomes empty, we set the send queue socket back to
non-blocking.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/unix_msg/unix_msg.c

index 73d97404af2412849be7a4b4d06fc09a10c533a2..8645c97a2de7fcb10890ebdd3a1470bb79912b8c 100644 (file)
@@ -393,7 +393,7 @@ static int unix_dgram_send_queue_init(
                goto fail_free;
        }
 
-       err = prepare_socket_cloexec(q->sock);
+       err = prepare_socket(q->sock);
        if (err != 0) {
                goto fail_close;
        }
@@ -503,6 +503,7 @@ static int find_send_queue(struct unix_dgram_ctx *ctx,
                           struct unix_dgram_send_queue **ps)
 {
        struct unix_dgram_send_queue *s;
+       int ret;
 
        for (s = ctx->send_queues; s != NULL; s = s->next) {
                if (strcmp(s->path, dst->sun_path) == 0) {
@@ -510,7 +511,12 @@ static int find_send_queue(struct unix_dgram_ctx *ctx,
                        return 0;
                }
        }
-       return ENOENT;
+       ret = unix_dgram_send_queue_init(ctx, dst, &s);
+       if (ret != 0) {
+               return ret;
+       }
+       *ps = s;
+       return 0;
 }
 
 static int queue_msg(struct unix_dgram_send_queue *q,
@@ -622,6 +628,11 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
                }
                return;
        }
+
+       ret = prepare_socket_nonblock(q->sock, true);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+       }
 }
 
 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
@@ -667,12 +678,16 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return EINVAL;
        }
 
-       /*
-        * To preserve message ordering, we have to queue a message when
-        * others are waiting in line already.
-        */
        ret = find_send_queue(ctx, dst, &q);
-       if (ret == 0) {
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (q->msgs) {
+               /*
+                * To preserve message ordering, we have to queue a
+                * message when others are waiting in line already.
+                */
                return queue_msg(q, iov, iovlen, fds, num_fds);
        }
 
@@ -681,8 +696,6 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
         */
 
        msg = (struct msghdr) {
-               .msg_name = discard_const_p(struct sockaddr_un, dst),
-               .msg_namelen = sizeof(*dst),
                .msg_iov = discard_const_p(struct iovec, iov),
                .msg_iovlen = iovlen
        };
@@ -696,7 +709,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                uint8_t buf[fdlen];
                msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
 
-               ret = sendmsg(ctx->sock, &msg, 0);
+               ret = sendmsg(q->sock, &msg, 0);
        }
 
        if (ret >= 0) {
@@ -712,11 +725,20 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return errno;
        }
 
-       ret = unix_dgram_send_queue_init(ctx, dst, &q);
+       ret = queue_msg(q, iov, iovlen, fds, num_fds);
        if (ret != 0) {
+               unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = queue_msg(q, iov, iovlen, fds, num_fds);
+
+       /*
+        * While sending the messages via the pthreadpool, we set the
+        * socket back to blocking mode. When the sendqueue becomes
+        * empty and we could attempt direct sends again, the
+        * finished-jobs-handler of the pthreadpool will set it back
+        * to non-blocking.
+        */
+       ret = prepare_socket_nonblock(q->sock, false);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
@@ -737,8 +759,16 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
 
 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
 {
-       if (ctx->send_queues != NULL) {
-               return EBUSY;
+       struct unix_dgram_send_queue *q;
+
+       for (q = ctx->send_queues; q != NULL;) {
+               struct unix_dgram_send_queue *q_next = q->next;
+
+               if (q->msgs != NULL) {
+                       return EBUSY;
+               }
+               unix_dgram_send_queue_free(q);
+               q = q_next;
        }
 
        if (ctx->send_pool != NULL) {