unix_msg: Fix unix_dgram_send_queue_init
[metze/samba/wip.git] / source3 / lib / unix_msg / unix_msg.c
index 17047ae9b094462eb0ad14f054dcbdb01605a15b..5cbf428c894473c5ef1ade2ca00331fc2943abff 100644 (file)
@@ -26,6 +26,7 @@
 #include "lib/util/iov_buf.h"
 #include "lib/util/msghdr.h"
 #include <fcntl.h>
+#include "lib/util/time.h"
 
 /*
  * This file implements two abstractions: The "unix_dgram" functions implement
@@ -51,6 +52,7 @@ struct unix_dgram_send_queue {
        struct unix_dgram_ctx *ctx;
        int sock;
        struct unix_dgram_msg *msgs;
+       struct poll_timeout *timeout;
        char path[];
 };
 
@@ -80,7 +82,7 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
                                    void *private_data);
 
 /* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock)
+static int prepare_socket_nonblock(int sock, bool nonblock)
 {
        int flags;
 #ifdef O_NONBLOCK
@@ -97,7 +99,11 @@ static int prepare_socket_nonblock(int sock)
        if (flags == -1) {
                return errno;
        }
-       flags |= FLAG_TO_SET;
+       if (nonblock) {
+               flags |= FLAG_TO_SET;
+       } else {
+               flags &= ~FLAG_TO_SET;
+       }
        if (fcntl(sock, F_SETFL, flags) == -1) {
                return errno;
        }
@@ -127,7 +133,7 @@ static int prepare_socket_cloexec(int sock)
 /* Set socket non blocking and close on exec. */
 static int prepare_socket(int sock)
 {
-       int ret = prepare_socket_nonblock(sock);
+       int ret = prepare_socket_nonblock(sock, true);
 
        if (ret) {
                return ret;
@@ -360,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
        return 0;
 }
 
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
 static int unix_dgram_send_queue_init(
        struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
        struct unix_dgram_send_queue **result)
@@ -376,6 +384,7 @@ static int unix_dgram_send_queue_init(
        }
        q->ctx = ctx;
        q->msgs = NULL;
+       q->timeout = NULL;
        memcpy(q->path, dst->sun_path, pathlen);
 
        q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
@@ -384,7 +393,7 @@ static int unix_dgram_send_queue_init(
                goto fail_free;
        }
 
-       err = prepare_socket_cloexec(q->sock);
+       err = prepare_socket(q->sock);
        if (err != 0) {
                goto fail_close;
        }
@@ -405,6 +414,12 @@ static int unix_dgram_send_queue_init(
                goto fail_close;
        }
 
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               err = ENOMEM;
+               goto fail_close;
+       }
+
        DLIST_ADD(ctx->send_queues, q);
 
        *result = q;
@@ -430,14 +445,64 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
        }
        close(q->sock);
        DLIST_REMOVE(ctx->send_queues, q);
+       ctx->ev_funcs->timeout_free(q->timeout);
        free(q);
 }
 
+static void unix_dgram_sendq_scheduled_free_handler(
+       struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+       struct unix_dgram_ctx *ctx = q->ctx;
+       struct timeval timeout;
+
+       if (q->timeout != NULL) {
+               return 0;
+       }
+
+       GetTimeOfDay(&timeout);
+       timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+       q->timeout = ctx->ev_funcs->timeout_new(
+               ctx->ev_funcs,
+               timeout,
+               unix_dgram_sendq_scheduled_free_handler,
+               q);
+       if (q->timeout == NULL) {
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+                                                   void *private_data)
+{
+       struct unix_dgram_send_queue *q = private_data;
+       int ret;
+
+       q->ctx->ev_funcs->timeout_free(q->timeout);
+       q->timeout = NULL;
+
+       if (q->msgs == NULL) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+}
+
 static int find_send_queue(struct unix_dgram_ctx *ctx,
                           const struct sockaddr_un *dst,
                           struct unix_dgram_send_queue **ps)
 {
        struct unix_dgram_send_queue *s;
+       int ret;
 
        for (s = ctx->send_queues; s != NULL; s = s->next) {
                if (strcmp(s->path, dst->sun_path) == 0) {
@@ -445,7 +510,12 @@ static int find_send_queue(struct unix_dgram_ctx *ctx,
                        return 0;
                }
        }
-       return ENOENT;
+       ret = unix_dgram_send_queue_init(ctx, dst, &s);
+       if (ret != 0) {
+               return ret;
+       }
+       *ps = s;
+       return 0;
 }
 
 static int queue_msg(struct unix_dgram_send_queue *q,
@@ -551,12 +621,17 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        if (q->msgs != NULL) {
                ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
                                               unix_dgram_send_job, q->msgs);
-               if (ret == 0) {
+               if (ret != 0) {
+                       unix_dgram_send_queue_free(q);
                        return;
                }
+               return;
        }
 
-       unix_dgram_send_queue_free(q);
+       ret = prepare_socket_nonblock(q->sock, true);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+       }
 }
 
 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
@@ -602,12 +677,16 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return EINVAL;
        }
 
-       /*
-        * To preserve message ordering, we have to queue a message when
-        * others are waiting in line already.
-        */
        ret = find_send_queue(ctx, dst, &q);
-       if (ret == 0) {
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (q->msgs) {
+               /*
+                * To preserve message ordering, we have to queue a
+                * message when others are waiting in line already.
+                */
                return queue_msg(q, iov, iovlen, fds, num_fds);
        }
 
@@ -616,8 +695,6 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
         */
 
        msg = (struct msghdr) {
-               .msg_name = discard_const_p(struct sockaddr_un, dst),
-               .msg_namelen = sizeof(*dst),
                .msg_iov = discard_const_p(struct iovec, iov),
                .msg_iovlen = iovlen
        };
@@ -631,7 +708,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                uint8_t buf[fdlen];
                msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
 
-               ret = sendmsg(ctx->sock, &msg, 0);
+               ret = sendmsg(q->sock, &msg, 0);
        }
 
        if (ret >= 0) {
@@ -647,11 +724,20 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return errno;
        }
 
-       ret = unix_dgram_send_queue_init(ctx, dst, &q);
+       ret = queue_msg(q, iov, iovlen, fds, num_fds);
        if (ret != 0) {
+               unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = queue_msg(q, iov, iovlen, fds, num_fds);
+
+       /*
+        * While sending the messages via the pthreadpool, we set the
+        * socket back to blocking mode. When the sendqueue becomes
+        * empty and we could attempt direct sends again, the
+        * finished-jobs-handler of the pthreadpool will set it back
+        * to non-blocking.
+        */
+       ret = prepare_socket_nonblock(q->sock, false);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
@@ -672,8 +758,16 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
 
 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
 {
-       if (ctx->send_queues != NULL) {
-               return EBUSY;
+       struct unix_dgram_send_queue *q;
+
+       for (q = ctx->send_queues; q != NULL;) {
+               struct unix_dgram_send_queue *q_next = q->next;
+
+               if (q->msgs != NULL) {
+                       return EBUSY;
+               }
+               unix_dgram_send_queue_free(q);
+               q = q_next;
        }
 
        if (ctx->send_pool != NULL) {