async_sock: add readv_send/recv()
authorStefan Metzmacher <metze@samba.org>
Thu, 12 Mar 2009 11:57:08 +0000 (12:57 +0100)
committerStefan Metzmacher <metze@samba.org>
Wed, 12 Mar 2014 23:04:41 +0000 (00:04 +0100)
The design is that the callback allocates the buffers
and provides iovec arrays which will be filled,
until an iovec count of 0 is returned or an error happens.

metze

lib/async_req/async_sock.c
lib/async_req/async_sock.h

index 74b2cb7baa803223ff14d8c01cf54c6c481b0d03..d69408d4afac04729ba33b671f7a91f3e951a07d 100644 (file)
@@ -560,6 +560,167 @@ ssize_t writev_recv(struct tevent_req *req, int *perrno)
        return state->total_size;
 }
 
+struct readv_state {
+       int fd;
+
+       int (*next_vector)(void *private_data,
+                          TALLOC_CTX *mem_ctx,
+                          struct iovec **vector,
+                          size_t *count);
+       void *private_data;
+
+       struct iovec *iov;
+       size_t count;
+
+       int total_read;
+};
+
+static bool readv_ask_for_vector(struct tevent_req *req,
+                                struct readv_state *state);
+static void readv_handler(struct tevent_context *ev,
+                         struct tevent_fd *fde,
+                         uint16_t flags, void *private_data);
+
+struct tevent_req *readv_send(TALLOC_CTX *mem_ctx,
+                             struct tevent_context *ev,
+                             int fd,
+                             int (*next_vector)(void *private_data,
+                                                TALLOC_CTX *mem_ctx,
+                                                struct iovec **vector,
+                                                size_t *count),
+                             void *private_data)
+{
+       struct tevent_req *req;
+       struct readv_state *state;
+       struct tevent_fd *fde;
+
+       req = tevent_req_create(mem_ctx, &state, struct readv_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->fd = fd;
+       state->next_vector = next_vector;
+       state->private_data = private_data;
+       state->iov = NULL;
+       state->count = 0;
+       state->total_read = 0;
+
+       if (!readv_ask_for_vector(req, state)) {
+               return tevent_req_post(req, ev);
+       }
+
+       fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, readv_handler,
+                           req);
+       if (fde == NULL) {
+               goto fail;
+       }
+       return req;
+ fail:
+       TALLOC_FREE(req);
+       return NULL;
+
+}
+
+static bool readv_ask_for_vector(struct tevent_req *req,
+                                struct readv_state *state)
+{
+       int ret;
+       size_t to_read = 0;
+       size_t i;
+
+       talloc_free(state->iov);
+       state->iov = NULL;
+       state->count = 0;
+
+       ret = state->next_vector(state->private_data,
+                                state, &state->iov, &state->count);
+       if (ret == -1) {
+               if (tevent_req_error(req, errno)) {
+                       return false;
+               }
+       }
+
+       for (i=0; i < state->count; i++) {
+               size_t tmp = to_read;
+               tmp += state->iov[i].iov_len;
+
+               if (tmp < to_read) {
+                       tevent_req_error(req, EMSGSIZE);
+                       return false;
+               }
+
+               to_read = tmp;
+       }
+
+       if (to_read == 0) {
+               tevent_req_done(req);
+               return false;
+       }
+
+       if (state->total_read + to_read < state->total_read) {
+               tevent_req_error(req, EMSGSIZE);
+               return false;
+       }
+
+       return true;
+}
+
+static void readv_handler(struct tevent_context *ev,
+                         struct tevent_fd *fde,
+                         uint16_t flags, void *private_data)
+{
+       struct tevent_req *req = talloc_get_type_abort(
+               private_data, struct tevent_req);
+       struct readv_state *state =
+               tevent_req_data(req, struct readv_state);
+       int nread;
+
+       nread = readv(state->fd, state->iov, state->count);
+       if (nread == -1) {
+               tevent_req_error(req, errno);
+               return;
+       }
+       if (nread == 0) {
+               tevent_req_error(req, EPIPE);
+               return;
+       }
+
+       state->total_read += nread;
+
+       while (nread > 0) {
+               if (nread < state->iov[0].iov_len) {
+                       uint8_t *base;
+                       base = (uint8_t *)state->iov[0].iov_base;
+                       base += nread;
+                       state->iov[0].iov_base = base;
+                       state->iov[0].iov_len -= nread;
+                       break;
+               }
+               nread -= state->iov[0].iov_len;
+               state->iov += 1;
+               state->count -= 1;
+       }
+
+       if (state->count) {
+               /* we have more to read */
+               return;
+       }
+
+       /* ask the callback for a new vector we should fill */
+       readv_ask_for_vector(req, state);
+}
+
+int readv_recv(struct tevent_req *req, int *perrno)
+{
+       struct readv_state *state =
+               tevent_req_data(req, struct readv_state);
+
+       if (tevent_req_is_unix_error(req, perrno)) {
+               return -1;
+       }
+       return state->total_read;
+}
+
 struct read_packet_state {
        int fd;
        uint8_t *buf;
index 494b92eb29f87f66a054478c68aaf90b42b42742..33195a679d2a7697c7ba5a75aff015f59d8554b1 100644 (file)
@@ -54,6 +54,16 @@ struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                               struct iovec *iov, int count);
 ssize_t writev_recv(struct tevent_req *req, int *perrno);
 
+struct tevent_req *readv_send(TALLOC_CTX *mem_ctx,
+                             struct tevent_context *ev,
+                             int fd,
+                             int (*next_vector)(void *private_data,
+                                                TALLOC_CTX *mem_ctx,
+                                                struct iovec **vector,
+                                                size_t *count),
+                             void *private_data);
+int readv_recv(struct tevent_req *req, int *perrno);
+
 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
                                    struct tevent_context *ev,
                                    int fd, size_t initial,