lib: Remove an obsolete comment
[samba.git] / lib / async_req / async_sock.c
index afa14567071c5eb7106ee9887459640acdc73194..795a2c63dbaceaedc5fbf7089b6726796bcd5e42 100644 (file)
@@ -28,6 +28,7 @@
 #include <tevent.h>
 #include "lib/async_req/async_sock.h"
 #include "lib/util/iov_buf.h"
+#include "lib/util/util_net.h"
 
 /* Note: lib/util/ is currently GPL */
 #include "lib/util/tevent_unix.h"
@@ -74,6 +75,7 @@ struct tevent_req *async_connect_send(
 {
        struct tevent_req *req;
        struct async_connect_state *state;
+       int ret;
 
        req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
        if (req == NULL) {
@@ -105,7 +107,11 @@ struct tevent_req *async_connect_send(
        }
        memcpy(&state->address, address, address_len);
 
-       set_blocking(fd, false);
+       ret = set_blocking(fd, false);
+       if (ret == -1) {
+               tevent_req_error(req, errno);
+               return tevent_req_post(req, ev);
+       }
 
        if (state->before_connect != NULL) {
                state->before_connect(state->private_data);
@@ -122,24 +128,24 @@ struct tevent_req *async_connect_send(
                return tevent_req_post(req, ev);
        }
 
-       /**
-        * A number of error messages show that something good is progressing
-        * and that we have to wait for readability.
+       /*
+        * The only errno indicating that an initial connect is still
+        * in flight is EINPROGRESS.
         *
-        * If none of them are present, bail out.
+        * This allows callers like open_socket_out_send() to reuse
+        * fds and call us with an fd for which the connect is still
+        * in flight. The proper thing to do for callers would be
+        * closing the fd and starting from scratch with a fresh
+        * socket.
         */
 
-       if (!(errno == EINPROGRESS || errno == EALREADY ||
-#ifdef EISCONN
-             errno == EISCONN ||
-#endif
-             errno == EAGAIN || errno == EINTR)) {
+       if (errno != EINPROGRESS) {
                tevent_req_error(req, errno);
                return tevent_req_post(req, ev);
        }
 
        state->fde = tevent_add_fd(ev, state, fd,
-                                  TEVENT_FD_READ | TEVENT_FD_WRITE,
+                                  TEVENT_FD_ERROR|TEVENT_FD_WRITE,
                                   async_connect_connected, req);
        if (state->fde == NULL) {
                tevent_req_error(req, ENOMEM);
@@ -156,7 +162,13 @@ static void async_connect_cleanup(struct tevent_req *req,
 
        TALLOC_FREE(state->fde);
        if (state->fd != -1) {
-               fcntl(state->fd, F_SETFL, state->old_sockflags);
+               int ret;
+
+               ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
+               if (ret == -1) {
+                       abort();
+               }
+
                state->fd = -1;
        }
 }
@@ -178,27 +190,32 @@ static void async_connect_connected(struct tevent_context *ev,
        struct async_connect_state *state =
                tevent_req_data(req, struct async_connect_state);
        int ret;
-
-       if (state->before_connect != NULL) {
-               state->before_connect(state->private_data);
-       }
-
-       ret = connect(state->fd, (struct sockaddr *)(void *)&state->address,
-                     state->address_len);
-
-       if (state->after_connect != NULL) {
-               state->after_connect(state->private_data);
-       }
-
-       if (ret == 0) {
-               tevent_req_done(req);
+       int socket_error = 0;
+       socklen_t slen = sizeof(socket_error);
+
+       ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
+                        &socket_error, &slen);
+
+       if (ret != 0) {
+               /*
+                * According to Stevens this is the Solaris behaviour
+                * in case the connection encountered an error:
+                * getsockopt() fails, error is in errno
+                */
+               tevent_req_error(req, errno);
                return;
        }
-       if (errno == EINPROGRESS) {
-               /* Try again later, leave the fde around */
+
+       if (socket_error != 0) {
+               /*
+                * Berkeley derived implementations (including) Linux
+                * return the pending error via socket_error.
+                */
+               tevent_req_error(req, socket_error);
                return;
        }
-       tevent_req_error(req, errno);
+
+       tevent_req_done(req);
        return;
 }
 
@@ -216,7 +233,9 @@ int async_connect_recv(struct tevent_req *req, int *perrno)
 
 struct writev_state {
        struct tevent_context *ev;
+       struct tevent_queue_entry *queue_entry;
        int fd;
+       struct tevent_fd *fde;
        struct iovec *iov;
        int count;
        size_t total_size;
@@ -224,6 +243,9 @@ struct writev_state {
        bool err_on_readability;
 };
 
+static void writev_cleanup(struct tevent_req *req,
+                          enum tevent_req_state req_state);
+static bool writev_cancel(struct tevent_req *req);
 static void writev_trigger(struct tevent_req *req, void *private_data);
 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
                           uint16_t flags, void *private_data);
@@ -246,88 +268,85 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        state->count = count;
        state->iov = (struct iovec *)talloc_memdup(
                state, iov, sizeof(struct iovec) * count);
-       if (state->iov == NULL) {
-               goto fail;
+       if (tevent_req_nomem(state->iov, req)) {
+               return tevent_req_post(req, ev);
        }
-       state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
-       state->err_on_readability = err_on_readability;
+       state->flags = TEVENT_FD_WRITE | TEVENT_FD_ERROR;
+       if (err_on_readability) {
+               state->flags |= TEVENT_FD_READ;
+       }
+
+       tevent_req_set_cleanup_fn(req, writev_cleanup);
+       tevent_req_set_cancel_fn(req, writev_cancel);
 
        if (queue == NULL) {
-               struct tevent_fd *fde;
-               fde = tevent_add_fd(state->ev, state, state->fd,
+               state->fde = tevent_add_fd(state->ev, state, state->fd,
                                    state->flags, writev_handler, req);
-               if (tevent_req_nomem(fde, req)) {
+               if (tevent_req_nomem(state->fde, req)) {
                        return tevent_req_post(req, ev);
                }
                return req;
        }
 
-       if (!tevent_queue_add(queue, ev, req, writev_trigger, NULL)) {
-               goto fail;
+       /*
+        * writev_trigger tries a nonblocking write. If that succeeds,
+        * we can't directly notify the callback to call
+        * writev_recv. The callback would TALLOC_FREE(req) after
+        * calling writev_recv even before writev_trigger can inspect
+        * it for success.
+        */
+       tevent_req_defer_callback(req, ev);
+
+       state->queue_entry = tevent_queue_add_optimize_empty(
+               queue, ev, req, writev_trigger, NULL);
+       if (tevent_req_nomem(state->queue_entry, req)) {
+               return tevent_req_post(req, ev);
+       }
+       if (!tevent_req_is_in_progress(req)) {
+               return tevent_req_post(req, ev);
        }
        return req;
- fail:
-       TALLOC_FREE(req);
-       return NULL;
 }
 
-static void writev_trigger(struct tevent_req *req, void *private_data)
+static void writev_cleanup(struct tevent_req *req,
+                          enum tevent_req_state req_state)
 {
        struct writev_state *state = tevent_req_data(req, struct writev_state);
-       struct tevent_fd *fde;
 
-       fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
-                           writev_handler, req);
-       if (fde == NULL) {
-               tevent_req_error(req, ENOMEM);
-       }
+       TALLOC_FREE(state->queue_entry);
+       TALLOC_FREE(state->fde);
 }
 
-static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
-                          uint16_t flags, void *private_data)
+static bool writev_cancel(struct tevent_req *req)
 {
-       struct tevent_req *req = talloc_get_type_abort(
-               private_data, struct tevent_req);
-       struct writev_state *state =
-               tevent_req_data(req, struct writev_state);
-       size_t written;
-       bool ok;
+       struct writev_state *state = tevent_req_data(req, struct writev_state);
 
-       if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
-               int ret, value;
+       if (state->total_size > 0) {
+               /*
+                * We've already started to write :-(
+                */
+               return false;
+       }
 
-               if (state->err_on_readability) {
-                       /* Readable and the caller wants an error on read. */
-                       tevent_req_error(req, EPIPE);
-                       return;
-               }
+       TALLOC_FREE(state->queue_entry);
+       TALLOC_FREE(state->fde);
 
-               /* Might be an error. Check if there are bytes to read */
-               ret = ioctl(state->fd, FIONREAD, &value);
-               /* FIXME - should we also check
-                  for ret == 0 and value == 0 here ? */
-               if (ret == -1) {
-                       /* There's an error. */
-                       tevent_req_error(req, EPIPE);
-                       return;
-               }
-               /* A request for TEVENT_FD_READ will succeed from now and
-                  forevermore until the bytes are read so if there was
-                  an error we'll wait until we do read, then get it in
-                  the read callback function. Until then, remove TEVENT_FD_READ
-                  from the flags we're waiting for. */
-               state->flags &= ~TEVENT_FD_READ;
-               TEVENT_FD_NOT_READABLE(fde);
-
-               /* If not writable, we're done. */
-               if (!(flags & TEVENT_FD_WRITE)) {
-                       return;
-               }
-       }
+       tevent_req_defer_callback(req, state->ev);
+       tevent_req_error(req, ECANCELED);
+       return true;
+}
+
+static void writev_do(struct tevent_req *req, struct writev_state *state)
+{
+       ssize_t written;
+       bool ok;
 
        written = writev(state->fd, state->iov, state->count);
-       if ((written == -1) && (errno == EINTR)) {
-               /* retry */
+       if ((written == -1) &&
+           ((errno == EINTR) ||
+            (errno == EAGAIN) ||
+            (errno == EWOULDBLOCK))) {
+               /* retry after going through the tevent loop */
                return;
        }
        if (written == -1) {
@@ -352,25 +371,78 @@ static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
        }
 }
 
+static void writev_trigger(struct tevent_req *req, void *private_data)
+{
+       struct writev_state *state = tevent_req_data(req, struct writev_state);
+
+       state->queue_entry = NULL;
+
+       writev_do(req, state);
+       if (!tevent_req_is_in_progress(req)) {
+               return;
+       }
+
+       state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
+                           writev_handler, req);
+       if (tevent_req_nomem(state->fde, req)) {
+               return;
+       }
+}
+
+static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
+                          uint16_t flags, void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct writev_state *state =
+               tevent_req_data(req, struct writev_state);
+
+       if (flags & TEVENT_FD_ERROR) {
+               /*
+                * There's an error, for legacy reasons
+                * we just use EPIPE instead of a more
+                * detailed error using
+                * samba_socket_poll_or_sock_error().
+                */
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
+       if (flags & TEVENT_FD_READ) {
+               /* Readable and the caller wants an error on read. */
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
+       writev_do(req, state);
+}
+
 ssize_t writev_recv(struct tevent_req *req, int *perrno)
 {
        struct writev_state *state =
                tevent_req_data(req, struct writev_state);
+       ssize_t ret;
 
        if (tevent_req_is_unix_error(req, perrno)) {
+               tevent_req_received(req);
                return -1;
        }
-       return state->total_size;
+       ret = state->total_size;
+       tevent_req_received(req);
+       return ret;
 }
 
 struct read_packet_state {
        int fd;
+       struct tevent_fd *fde;
        uint8_t *buf;
        size_t nread;
        ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
        void *private_data;
 };
 
+static void read_packet_cleanup(struct tevent_req *req,
+                                enum tevent_req_state req_state);
 static void read_packet_handler(struct tevent_context *ev,
                                struct tevent_fd *fde,
                                uint16_t flags, void *private_data);
@@ -383,12 +455,11 @@ struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
                                                    void *private_data),
                                    void *private_data)
 {
-       struct tevent_req *result;
+       struct tevent_req *req;
        struct read_packet_state *state;
-       struct tevent_fd *fde;
 
-       result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
-       if (result == NULL) {
+       req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
+       if (req == NULL) {
                return NULL;
        }
        state->fd = fd;
@@ -396,20 +467,29 @@ struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
        state->more = more;
        state->private_data = private_data;
 
+       tevent_req_set_cleanup_fn(req, read_packet_cleanup);
+
        state->buf = talloc_array(state, uint8_t, initial);
-       if (state->buf == NULL) {
-               goto fail;
+       if (tevent_req_nomem(state->buf, req)) {
+               return tevent_req_post(req, ev);
        }
 
-       fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
-                           result);
-       if (fde == NULL) {
-               goto fail;
+       state->fde = tevent_add_fd(ev, state, fd,
+                                  TEVENT_FD_READ, read_packet_handler,
+                                  req);
+       if (tevent_req_nomem(state->fde, req)) {
+               return tevent_req_post(req, ev);
        }
-       return result;
- fail:
-       TALLOC_FREE(result);
-       return NULL;
+       return req;
+}
+
+static void read_packet_cleanup(struct tevent_req *req,
+                          enum tevent_req_state req_state)
+{
+       struct read_packet_state *state =
+               tevent_req_data(req, struct read_packet_state);
+
+       TALLOC_FREE(state->fde);
 }
 
 static void read_packet_handler(struct tevent_context *ev,
@@ -489,25 +569,30 @@ ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
                tevent_req_data(req, struct read_packet_state);
 
        if (tevent_req_is_unix_error(req, perrno)) {
+               tevent_req_received(req);
                return -1;
        }
        *pbuf = talloc_move(mem_ctx, &state->buf);
+       tevent_req_received(req);
        return talloc_get_size(*pbuf);
 }
 
 struct wait_for_read_state {
-       struct tevent_req *req;
        struct tevent_fd *fde;
+       int fd;
+       bool check_errors;
 };
 
+static void wait_for_read_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state);
 static void wait_for_read_done(struct tevent_context *ev,
                               struct tevent_fd *fde,
                               uint16_t flags,
                               void *private_data);
 
 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
-                                     struct tevent_context *ev,
-                                     int fd)
+                                     struct tevent_context *ev, int fd,
+                                     bool check_errors)
 {
        struct tevent_req *req;
        struct wait_for_read_state *state;
@@ -516,36 +601,167 @@ struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
        if (req == NULL) {
                return NULL;
        }
-       state->req = req;
+
+       tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
+
        state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
-                                  wait_for_read_done, state);
+                                  wait_for_read_done, req);
        if (tevent_req_nomem(state->fde, req)) {
                return tevent_req_post(req, ev);
        }
+
+       state->fd = fd;
+       state->check_errors = check_errors;
        return req;
 }
 
+static void wait_for_read_cleanup(struct tevent_req *req,
+                                 enum tevent_req_state req_state)
+{
+       struct wait_for_read_state *state =
+               tevent_req_data(req, struct wait_for_read_state);
+
+       TALLOC_FREE(state->fde);
+}
+
 static void wait_for_read_done(struct tevent_context *ev,
                               struct tevent_fd *fde,
                               uint16_t flags,
                               void *private_data)
 {
-       struct wait_for_read_state *state = talloc_get_type_abort(
-               private_data, struct wait_for_read_state);
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct wait_for_read_state *state =
+           tevent_req_data(req, struct wait_for_read_state);
+       int ret, available;
 
-       if (flags & TEVENT_FD_READ) {
-               TALLOC_FREE(state->fde);
-               tevent_req_done(state->req);
+       if ((flags & TEVENT_FD_READ) == 0) {
+               return;
        }
+
+       if (!state->check_errors) {
+               tevent_req_done(req);
+               return;
+       }
+
+       ret = ioctl(state->fd, FIONREAD, &available);
+
+       if ((ret == -1) && (errno == EINTR)) {
+               /* come back later */
+               return;
+       }
+
+       if (ret == -1) {
+               tevent_req_error(req, errno);
+               return;
+       }
+
+       if (available == 0) {
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
+       tevent_req_done(req);
 }
 
 bool wait_for_read_recv(struct tevent_req *req, int *perr)
 {
-       int err;
+       int err = tevent_req_simple_recv_unix(req);
 
-       if (tevent_req_is_unix_error(req, &err)) {
+       if (err != 0) {
                *perr = err;
                return false;
        }
+
        return true;
 }
+
+struct accept_state {
+       struct tevent_fd *fde;
+       int listen_sock;
+       struct samba_sockaddr addr;
+       int sock;
+};
+
+static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
+                          uint16_t flags, void *private_data);
+
+struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+                              int listen_sock)
+{
+       struct tevent_req *req;
+       struct accept_state *state;
+
+       req = tevent_req_create(mem_ctx, &state, struct accept_state);
+       if (req == NULL) {
+               return NULL;
+       }
+
+       state->listen_sock = listen_sock;
+
+       state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ,
+                                  accept_handler, req);
+       if (tevent_req_nomem(state->fde, req)) {
+               return tevent_req_post(req, ev);
+       }
+       return req;
+}
+
+static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
+                          uint16_t flags, void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct accept_state *state = tevent_req_data(req, struct accept_state);
+       int ret;
+
+       TALLOC_FREE(state->fde);
+
+       if ((flags & TEVENT_FD_READ) == 0) {
+               tevent_req_error(req, EIO);
+               return;
+       }
+
+       state->addr.sa_socklen = sizeof(state->addr.u);
+
+       ret = accept(state->listen_sock,
+                    &state->addr.u.sa,
+                    &state->addr.sa_socklen);
+       if ((ret == -1) && (errno == EINTR)) {
+               /* retry */
+               return;
+       }
+       if (ret == -1) {
+               tevent_req_error(req, errno);
+               return;
+       }
+       smb_set_close_on_exec(ret);
+       state->sock = ret;
+       tevent_req_done(req);
+}
+
+int accept_recv(struct tevent_req *req,
+               int *listen_sock,
+               struct samba_sockaddr *paddr,
+               int *perr)
+{
+       struct accept_state *state = tevent_req_data(req, struct accept_state);
+       int sock = state->sock;
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               if (perr != NULL) {
+                       *perr = err;
+               }
+               tevent_req_received(req);
+               return -1;
+       }
+       if (listen_sock != NULL) {
+               *listen_sock = state->listen_sock;
+       }
+       if (paddr != NULL) {
+               *paddr = state->addr;
+       }
+       tevent_req_received(req);
+       return sock;
+}