#include "system/time.h"
#include "system/network.h"
#include "lib/util/dlinklist.h"
-#include "pthreadpool/pthreadpool.h"
+#include "pthreadpool/pthreadpool_pipe.h"
#include "lib/util/iov_buf.h"
#include "lib/util/msghdr.h"
#include <fcntl.h>
+#include "lib/util/time.h"
/*
* This file implements two abstractions: The "unix_dgram" functions implement
struct unix_dgram_ctx *ctx;
int sock;
struct unix_dgram_msg *msgs;
+ struct poll_timeout *timeout;
char path[];
};
struct poll_watch *sock_read_watch;
struct unix_dgram_send_queue *send_queues;
- struct pthreadpool *send_pool;
+ struct pthreadpool_pipe *send_pool;
struct poll_watch *pool_read_watch;
uint8_t *recv_buf;
void *private_data);
/* Set socket non blocking. */
-static int prepare_socket_nonblock(int sock)
+static int prepare_socket_nonblock(int sock, bool nonblock)
{
int flags;
#ifdef O_NONBLOCK
if (flags == -1) {
return errno;
}
- flags |= FLAG_TO_SET;
+ if (nonblock) {
+ flags |= FLAG_TO_SET;
+ } else {
+ flags &= ~FLAG_TO_SET;
+ }
if (fcntl(sock, F_SETFL, flags) == -1) {
return errno;
}
/* Set socket non blocking and close on exec. */
static int prepare_socket(int sock)
{
- int ret = prepare_socket_nonblock(sock);
+ int ret = prepare_socket_nonblock(sock, true);
if (ret) {
return ret;
return 0;
}
- ret = pthreadpool_init(0, &ctx->send_pool);
+ ret = pthreadpool_pipe_init(0, &ctx->send_pool);
if (ret != 0) {
return ret;
}
- signalfd = pthreadpool_signal_fd(ctx->send_pool);
+ signalfd = pthreadpool_pipe_signal_fd(ctx->send_pool);
ctx->pool_read_watch = ctx->ev_funcs->watch_new(
ctx->ev_funcs, signalfd, POLLIN,
unix_dgram_job_finished, ctx);
if (ctx->pool_read_watch == NULL) {
- pthreadpool_destroy(ctx->send_pool);
+ pthreadpool_pipe_destroy(ctx->send_pool);
ctx->send_pool = NULL;
return ENOMEM;
}
return 0;
}
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q);
+
static int unix_dgram_send_queue_init(
struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
struct unix_dgram_send_queue **result)
}
q->ctx = ctx;
q->msgs = NULL;
+ q->timeout = NULL;
memcpy(q->path, dst->sun_path, pathlen);
q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
goto fail_free;
}
- err = prepare_socket_cloexec(q->sock);
+ err = prepare_socket(q->sock);
if (err != 0) {
goto fail_close;
}
goto fail_close;
}
+ ret = unix_dgram_sendq_schedule_free(q);
+ if (ret != 0) {
+ err = ENOMEM;
+ goto fail_close;
+ }
+
DLIST_ADD(ctx->send_queues, q);
*result = q;
}
close(q->sock);
DLIST_REMOVE(ctx->send_queues, q);
+ ctx->ev_funcs->timeout_free(q->timeout);
free(q);
}
-static struct unix_dgram_send_queue *find_send_queue(
- struct unix_dgram_ctx *ctx, const char *dst_sock)
+static void unix_dgram_sendq_scheduled_free_handler(
+ struct poll_timeout *t, void *private_data);
+
+static int unix_dgram_sendq_schedule_free(struct unix_dgram_send_queue *q)
+{
+ struct unix_dgram_ctx *ctx = q->ctx;
+ struct timeval timeout;
+
+ if (q->timeout != NULL) {
+ return 0;
+ }
+
+ GetTimeOfDay(&timeout);
+ timeout.tv_sec += SENDQ_CACHE_TIME_SECS;
+
+ q->timeout = ctx->ev_funcs->timeout_new(
+ ctx->ev_funcs,
+ timeout,
+ unix_dgram_sendq_scheduled_free_handler,
+ q);
+ if (q->timeout == NULL) {
+ return ENOMEM;
+ }
+
+ return 0;
+}
+
+static void unix_dgram_sendq_scheduled_free_handler(struct poll_timeout *t,
+ void *private_data)
+{
+ struct unix_dgram_send_queue *q = private_data;
+ int ret;
+
+ q->ctx->ev_funcs->timeout_free(q->timeout);
+ q->timeout = NULL;
+
+ if (q->msgs == NULL) {
+ unix_dgram_send_queue_free(q);
+ return;
+ }
+
+ ret = unix_dgram_sendq_schedule_free(q);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
+ return;
+ }
+}
+
+static int find_send_queue(struct unix_dgram_ctx *ctx,
+ const struct sockaddr_un *dst,
+ struct unix_dgram_send_queue **ps)
{
struct unix_dgram_send_queue *s;
+ int ret;
for (s = ctx->send_queues; s != NULL; s = s->next) {
- if (strcmp(s->path, dst_sock) == 0) {
- return s;
+ if (strcmp(s->path, dst->sun_path) == 0) {
+ *ps = s;
+ return 0;
}
}
- return NULL;
+ ret = unix_dgram_send_queue_init(ctx, dst, &s);
+ if (ret != 0) {
+ return ret;
+ }
+ *ps = s;
+ return 0;
}
static int queue_msg(struct unix_dgram_send_queue *q,
struct unix_dgram_msg *msg;
int ret, job;
- ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
+ ret = pthreadpool_pipe_finished_jobs(ctx->send_pool, &job, 1);
if (ret != 1) {
return;
}
free(msg);
if (q->msgs != NULL) {
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
- if (ret == 0) {
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
return;
}
+ return;
}
- unix_dgram_send_queue_free(q);
+ ret = prepare_socket_nonblock(q->sock, true);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
+ }
}
static int unix_dgram_send(struct unix_dgram_ctx *ctx,
return EINVAL;
}
- /*
- * To preserve message ordering, we have to queue a message when
- * others are waiting in line already.
- */
- q = find_send_queue(ctx, dst->sun_path);
- if (q != NULL) {
+ ret = find_send_queue(ctx, dst, &q);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (q->msgs) {
+ /*
+ * To preserve message ordering, we have to queue a
+ * message when others are waiting in line already.
+ */
return queue_msg(q, iov, iovlen, fds, num_fds);
}
*/
msg = (struct msghdr) {
- .msg_name = discard_const_p(struct sockaddr_un, dst),
- .msg_namelen = sizeof(*dst),
.msg_iov = discard_const_p(struct iovec, iov),
.msg_iovlen = iovlen
};
uint8_t buf[fdlen];
msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
- ret = sendmsg(ctx->sock, &msg, 0);
+ ret = sendmsg(q->sock, &msg, 0);
}
if (ret >= 0) {
return errno;
}
- ret = unix_dgram_send_queue_init(ctx, dst, &q);
+ ret = queue_msg(q, iov, iovlen, fds, num_fds);
if (ret != 0) {
+ unix_dgram_send_queue_free(q);
return ret;
}
- ret = queue_msg(q, iov, iovlen, fds, num_fds);
+
+ /*
+ * While sending the messages via the pthreadpool, we set the
+ * socket back to blocking mode. When the sendqueue becomes
+ * empty and we could attempt direct sends again, the
+ * finished-jobs-handler of the pthreadpool will set it back
+ * to non-blocking.
+ */
+ ret = prepare_socket_nonblock(q->sock, false);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
}
- ret = pthreadpool_add_job(ctx->send_pool, q->sock,
- unix_dgram_send_job, q->msgs);
+ ret = pthreadpool_pipe_add_job(ctx->send_pool, q->sock,
+ unix_dgram_send_job, q->msgs);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
static int unix_dgram_free(struct unix_dgram_ctx *ctx)
{
- if (ctx->send_queues != NULL) {
- return EBUSY;
+ struct unix_dgram_send_queue *q;
+
+ for (q = ctx->send_queues; q != NULL;) {
+ struct unix_dgram_send_queue *q_next = q->next;
+
+ if (q->msgs != NULL) {
+ return EBUSY;
+ }
+ unix_dgram_send_queue_free(q);
+ q = q_next;
}
if (ctx->send_pool != NULL) {
- int ret = pthreadpool_destroy(ctx->send_pool);
+ int ret = pthreadpool_pipe_destroy(ctx->send_pool);
if (ret != 0) {
return ret;
}