unix_msg: Fix unix_dgram_send_queue_init
[metze/samba/wip.git] / source3 / lib / unix_msg / unix_msg.c
index aed1f7560b59c2a46caafa01953ee8f58dfc9158..5cbf428c894473c5ef1ade2ca00331fc2943abff 100644 (file)
 #include "system/time.h"
 #include "system/network.h"
 #include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "pthreadpool/pthreadpool_pipe.h"
 #include "lib/util/iov_buf.h"
 #include "lib/util/msghdr.h"
 #include <fcntl.h>
+#include "lib/util/time.h"
 
 /*
  * This file implements two abstractions: The "unix_dgram" functions implement
@@ -51,6 +52,7 @@ struct unix_dgram_send_queue {
        struct unix_dgram_ctx *ctx;
        int sock;
        struct unix_dgram_msg *msgs;
+       struct poll_timeout *timeout;
        char path[];
 };
 
@@ -69,7 +71,7 @@ struct unix_dgram_ctx {
        struct poll_watch *sock_read_watch;
        struct unix_dgram_send_queue *send_queues;
 
-       struct pthreadpool *send_pool;
+       struct pthreadpool_pipe *send_pool;
        struct poll_watch *pool_read_watch;
 
        uint8_t *recv_buf;
@@ -80,7 +82,7 @@ static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
                                    void *private_data);
 
 /* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock)
+static int prepare_socket_nonblock(int sock, bool nonblock)
 {
        int flags;
 #ifdef O_NONBLOCK
@@ -97,7 +99,11 @@ static int prepare_socket_nonblock(int sock)
        if (flags == -1) {
                return errno;
        }
-       flags |= FLAG_TO_SET;
+       if (nonblock) {
+               flags |= FLAG_TO_SET;
+       } else {
+               flags &= ~FLAG_TO_SET;
+       }
        if (fcntl(sock, F_SETFL, flags) == -1) {
                return errno;
        }
@@ -127,7 +133,7 @@ static int prepare_socket_cloexec(int sock)
 /* Set socket non blocking and close on exec. */
 static int prepare_socket(int sock)
 {
-       int ret = prepare_socket_nonblock(sock);
+       int ret = prepare_socket_nonblock(sock, true);
 
        if (ret) {
                return ret;
@@ -341,18 +347,18 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
                return 0;
        }
 
-       ret = pthreadpool_init(0, &ctx->send_pool);
+       ret = pthreadpool_pipe_init(0, &ctx->send_pool);
        if (ret != 0) {
                return ret;
        }
 
-       signalfd = pthreadpool_signal_fd(ctx->send_pool);
+       signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
 
        ctx->pool_read_watch = ctx->ev_funcs->watch_new(
                ctx->ev_funcs, signalfd, POLLIN,
                unix_dgram_job_finished, ctx);
        if (ctx->pool_read_watch == NULL) {
-               pthreadpool_destroy(ctx->send_pool);
+               pthreadpool_pipe_destroy(ctx->send_pool);
                ctx->send_pool = NULL;
                return ENOMEM;
        }
@@ -360,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
        return 0;
 }
 
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
 static int unix_dgram_send_queue_init(
        struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
        struct unix_dgram_send_queue **result)
@@ -376,6 +384,7 @@ static int unix_dgram_send_queue_init(
        }
        q->ctx = ctx;
        q->msgs = NULL;
+       q->timeout = NULL;
        memcpy(q->path, dst->sun_path, pathlen);
 
        q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
@@ -384,7 +393,7 @@ static int unix_dgram_send_queue_init(
                goto fail_free;
        }
 
-       err = prepare_socket_cloexec(q->sock);
+       err = prepare_socket(q->sock);
        if (err != 0) {
                goto fail_close;
        }
@@ -405,6 +414,12 @@ static int unix_dgram_send_queue_init(
                goto fail_close;
        }
 
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               err = ENOMEM;
+               goto fail_close;
+       }
+
        DLIST_ADD(ctx->send_queues, q);
 
        *result = q;
@@ -430,20 +445,77 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
        }
        close(q->sock);
        DLIST_REMOVE(ctx->send_queues, q);
+       ctx->ev_funcs->timeout_free(q->timeout);
        free(q);
 }
 
-static struct unix_dgram_send_queue *find_send_queue(
-       struct unix_dgram_ctx *ctx, const char *dst_sock)
+static void unix_dgram_sendq_scheduled_free_handler(
+       struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+       struct unix_dgram_ctx *ctx = q->ctx;
+       struct timeval timeout;
+
+       if (q->timeout != NULL) {
+               return 0;
+       }
+
+       GetTimeOfDay(&timeout);
+       timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+       q->timeout = ctx->ev_funcs->timeout_new(
+               ctx->ev_funcs,
+               timeout,
+               unix_dgram_sendq_scheduled_free_handler,
+               q);
+       if (q->timeout == NULL) {
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+                                                   void *private_data)
+{
+       struct unix_dgram_send_queue *q = private_data;
+       int ret;
+
+       q->ctx->ev_funcs->timeout_free(q->timeout);
+       q->timeout = NULL;
+
+       if (q->msgs == NULL) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+}
+
+static int find_send_queue(struct unix_dgram_ctx *ctx,
+                          const struct sockaddr_un *dst,
+                          struct unix_dgram_send_queue **ps)
 {
        struct unix_dgram_send_queue *s;
+       int ret;
 
        for (s = ctx->send_queues; s != NULL; s = s->next) {
-               if (strcmp(s->path, dst_sock) == 0) {
-                       return s;
+               if (strcmp(s->path, dst->sun_path) == 0) {
+                       *ps = s;
+                       return 0;
                }
        }
-       return NULL;
+       ret = unix_dgram_send_queue_init(ctx, dst, &s);
+       if (ret != 0) {
+               return ret;
+       }
+       *ps = s;
+       return 0;
 }
 
 static int queue_msg(struct unix_dgram_send_queue *q,
@@ -525,7 +597,7 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        struct unix_dgram_msg *msg;
        int ret, job;
 
-       ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+       ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
        if (ret != 1) {
                return;
        }
@@ -547,14 +619,19 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        free(msg);
 
        if (q->msgs != NULL) {
-               ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                         unix_dgram_send_job, q->msgs);
-               if (ret == 0) {
+               ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                              unix_dgram_send_job, q->msgs);
+               if (ret != 0) {
+                       unix_dgram_send_queue_free(q);
                        return;
                }
+               return;
        }
 
-       unix_dgram_send_queue_free(q);
+       ret = prepare_socket_nonblock(q->sock, true);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+       }
 }
 
 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
@@ -600,12 +677,16 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return EINVAL;
        }
 
-       /*
-        * To preserve message ordering, we have to queue a message when
-        * others are waiting in line already.
-        */
-       q = find_send_queue(ctx, dst->sun_path);
-       if (q != NULL) {
+       ret = find_send_queue(ctx, dst, &q);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if (q->msgs) {
+               /*
+                * To preserve message ordering, we have to queue a
+                * message when others are waiting in line already.
+                */
                return queue_msg(q, iov, iovlen, fds, num_fds);
        }
 
@@ -614,8 +695,6 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
         */
 
        msg = (struct msghdr) {
-               .msg_name = discard_const_p(struct sockaddr_un, dst),
-               .msg_namelen = sizeof(*dst),
                .msg_iov = discard_const_p(struct iovec, iov),
                .msg_iovlen = iovlen
        };
@@ -629,7 +708,7 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                uint8_t buf[fdlen];
                msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
 
-               ret = sendmsg(ctx->sock, &msg, 0);
+               ret = sendmsg(q->sock, &msg, 0);
        }
 
        if (ret >= 0) {
@@ -645,17 +724,26 @@ static int unix_dgram_send(struct unix_dgram_ctx *ctx,
                return errno;
        }
 
-       ret = unix_dgram_send_queue_init(ctx, dst, &q);
+       ret = queue_msg(q, iov, iovlen, fds, num_fds);
        if (ret != 0) {
+               unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = queue_msg(q, iov, iovlen, fds, num_fds);
+
+       /*
+        * While sending the messages via the pthreadpool, we set the
+        * socket back to blocking mode. When the sendqueue becomes
+        * empty and we could attempt direct sends again, the
+        * finished-jobs-handler of the pthreadpool will set it back
+        * to non-blocking.
+        */
+       ret = prepare_socket_nonblock(q->sock, false);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
        }
-       ret = pthreadpool_add_job(ctx->send_pool, q->sock,
-                                 unix_dgram_send_job, q->msgs);
+       ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+                                      unix_dgram_send_job, q->msgs);
        if (ret != 0) {
                unix_dgram_send_queue_free(q);
                return ret;
@@ -670,12 +758,20 @@ static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
 
 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
 {
-       if (ctx->send_queues != NULL) {
-               return EBUSY;
+       struct unix_dgram_send_queue *q;
+
+       for (q = ctx->send_queues; q != NULL;) {
+               struct unix_dgram_send_queue *q_next = q->next;
+
+               if (q->msgs != NULL) {
+                       return EBUSY;
+               }
+               unix_dgram_send_queue_free(q);
+               q = q_next;
        }
 
        if (ctx->send_pool != NULL) {
-               int ret = pthreadpool_destroy(ctx->send_pool);
+               int ret = pthreadpool_pipe_destroy(ctx->send_pool);
                if (ret != 0) {
                        return ret;
                }