unix_msg: introduce send queue caching
authorRalph Boehme <slow@samba.org>
Fri, 19 Aug 2016 14:25:11 +0000 (16:25 +0200)
committerJeremy Allison <jra@samba.org>
Mon, 12 Sep 2016 22:19:27 +0000 (00:19 +0200)
This introduces caching of unix datagram send queues. Right now send
queues are only created for peers if the channel to the peer is full and
a send reported EWOULDBLOCK.

At this stage, performance will actually be slightly worse, because now
if there's a cached queue for a peer without queued messages, we don't
attempt direct send anymore until the send queue is removed from the
cache.

The next commit will modify unix_msg to always create a send queue with
the datagram socket in connected mode and again attempt an non-blocking
send on the connected socket first. Then only if that returns
EWOULDBLOCK, the send has to go through the threadpool.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/unix_msg/unix_msg.c
source3/lib/unix_msg/unix_msg.h
source3/lib/unix_msg/wscript_build

index f86822e1a10b8a9d722f94b1c70e7d84e5f8672c..73d97404af2412849be7a4b4d06fc09a10c533a2 100644 (file)
@@ -26,6 +26,7 @@
 #include "lib/util/iov_buf.h"
 #include "lib/util/msghdr.h"
 #include <fcntl.h>
+#include "lib/util/time.h"
 
 /*
  * This file implements two abstractions: The "unix_dgram" functions implement
@@ -51,6 +52,7 @@ struct unix_dgram_send_queue {
        struct unix_dgram_ctx *ctx;
        int sock;
        struct unix_dgram_msg *msgs;
+       struct poll_timeout *timeout;
        char path[];
 };
 
@@ -364,6 +366,8 @@ static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
        return 0;
 }
 
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
 static int unix_dgram_send_queue_init(
        struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
        struct unix_dgram_send_queue **result)
@@ -380,6 +384,7 @@ static int unix_dgram_send_queue_init(
        }
        q->ctx = ctx;
        q->msgs = NULL;
+       q->timeout = NULL;
        memcpy(q->path, dst->sun_path, pathlen);
 
        q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
@@ -411,6 +416,12 @@ static int unix_dgram_send_queue_init(
 
        DLIST_ADD(ctx->send_queues, q);
 
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               err = ENOMEM;
+               goto fail_close;
+       }
+
        *result = q;
        return 0;
 
@@ -434,9 +445,59 @@ static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
        }
        close(q->sock);
        DLIST_REMOVE(ctx->send_queues, q);
+       ctx->ev_funcs->timeout_free(q->timeout);
        free(q);
 }
 
+static void unix_dgram_sendq_scheduled_free_handler(
+       struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+       struct unix_dgram_ctx *ctx = q->ctx;
+       struct timeval timeout;
+
+       if (q->timeout != NULL) {
+               return 0;
+       }
+
+       GetTimeOfDay(&timeout);
+       timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+       q->timeout = ctx->ev_funcs->timeout_new(
+               ctx->ev_funcs,
+               timeout,
+               unix_dgram_sendq_scheduled_free_handler,
+               q);
+       if (q->timeout == NULL) {
+               unix_dgram_send_queue_free(q);
+               return ENOMEM;
+       }
+
+       return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+                                                   void *private_data)
+{
+       struct unix_dgram_send_queue *q = private_data;
+       int ret;
+
+       q->ctx->ev_funcs->timeout_free(q->timeout);
+       q->timeout = NULL;
+
+       if (q->msgs == NULL) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+
+       ret = unix_dgram_sendq_schedule_free(q);
+       if (ret != 0) {
+               unix_dgram_send_queue_free(q);
+               return;
+       }
+}
+
 static int find_send_queue(struct unix_dgram_ctx *ctx,
                           const struct sockaddr_un *dst,
                           struct unix_dgram_send_queue **ps)
@@ -555,12 +616,12 @@ static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
        if (q->msgs != NULL) {
                ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
                                               unix_dgram_send_job, q->msgs);
-               if (ret == 0) {
+               if (ret != 0) {
+                       unix_dgram_send_queue_free(q);
                        return;
                }
+               return;
        }
-
-       unix_dgram_send_queue_free(q);
 }
 
 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
index 34c166bc666bdebab7ac51614c1c2f05a5555985..375d4ac429ae86d09d0438f1dcace88c70bb4323 100644 (file)
@@ -116,4 +116,6 @@ int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
  */
 int unix_msg_free(struct unix_msg_ctx *ctx);
 
+#define SENDQ_CACHE_TIME_SECS 10
+
 #endif
index b16d52cc716be927f3d4f34f63ff655a9c7f8c58..469f87efdb336137abf58a46732c5546835b5f45 100644 (file)
@@ -2,7 +2,7 @@
 
 bld.SAMBA3_SUBSYSTEM('UNIX_MSG',
                      source='unix_msg.c',
-                    deps='replace PTHREADPOOL iov_buf msghdr')
+                    deps='replace PTHREADPOOL iov_buf msghdr time-basic')
 
 bld.SAMBA3_BINARY('unix_msg_test',
                   source='tests.c',