goto fail_free;
}
- err = prepare_socket_cloexec(q->sock);
+ err = prepare_socket(q->sock);
if (err != 0) {
goto fail_close;
}
struct unix_dgram_send_queue **ps)
{
struct unix_dgram_send_queue *s;
+ int ret;
for (s = ctx->send_queues; s != NULL; s = s->next) {
if (strcmp(s->path, dst->sun_path) == 0) {
return 0;
}
}
- return ENOENT;
+ ret = unix_dgram_send_queue_init(ctx, dst, &s);
+ if (ret != 0) {
+ return ret;
+ }
+ *ps = s;
+ return 0;
}
static int queue_msg(struct unix_dgram_send_queue *q,
}
return;
}
+
+ ret = prepare_socket_nonblock(q->sock, true);
+ if (ret != 0) {
+ unix_dgram_send_queue_free(q);
+ }
}
static int unix_dgram_send(struct unix_dgram_ctx *ctx,
return EINVAL;
}
- /*
- * To preserve message ordering, we have to queue a message when
- * others are waiting in line already.
- */
ret = find_send_queue(ctx, dst, &q);
- if (ret == 0) {
+ if (ret != 0) {
+ return ret;
+ }
+
+ if (q->msgs) {
+ /*
+ * To preserve message ordering, we have to queue a
+ * message when others are waiting in line already.
+ */
return queue_msg(q, iov, iovlen, fds, num_fds);
}
*/
msg = (struct msghdr) {
- .msg_name = discard_const_p(struct sockaddr_un, dst),
- .msg_namelen = sizeof(*dst),
.msg_iov = discard_const_p(struct iovec, iov),
.msg_iovlen = iovlen
};
uint8_t buf[fdlen];
msghdr_prep_fds(&msg, buf, fdlen, fds, num_fds);
- ret = sendmsg(ctx->sock, &msg, 0);
+ ret = sendmsg(q->sock, &msg, 0);
}
if (ret >= 0) {
return errno;
}
- ret = unix_dgram_send_queue_init(ctx, dst, &q);
+ ret = queue_msg(q, iov, iovlen, fds, num_fds);
if (ret != 0) {
+ unix_dgram_send_queue_free(q);
return ret;
}
- ret = queue_msg(q, iov, iovlen, fds, num_fds);
+
+ /*
+ * While sending the messages via the pthreadpool, we set the
+ * socket back to blocking mode. When the sendqueue becomes
+ * empty and we could attempt direct sends again, the
+ * finished-jobs-handler of the pthreadpool will set it back
+ * to non-blocking.
+ */
+ ret = prepare_socket_nonblock(q->sock, false);
if (ret != 0) {
unix_dgram_send_queue_free(q);
return ret;
static int unix_dgram_free(struct unix_dgram_ctx *ctx)
{
- if (ctx->send_queues != NULL) {
- return EBUSY;
+ struct unix_dgram_send_queue *q;
+
+ for (q = ctx->send_queues; q != NULL;) {
+ struct unix_dgram_send_queue *q_next = q->next;
+
+ if (q->msgs != NULL) {
+ return EBUSY;
+ }
+ unix_dgram_send_queue_free(q);
+ q = q_next;
}
if (ctx->send_pool != NULL) {