s3:libsmb: rewrite cli_pull* to use smb1cli_conn_req_possible()
authorStefan Metzmacher <metze@samba.org>
Tue, 13 Aug 2013 16:03:50 +0000 (18:03 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 15 Aug 2013 07:07:06 +0000 (09:07 +0200)
This works out if it's possible to ship the next request dynamically
instead of relying on fixed values.

The default window size is 16 MByte.

We limit the number of outstanding chunks/requests to 256.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/libsmb/clireadwrite.c

index 550c52b15e0690b9d03c100c304f81f72b8a9379..dd5d4c2865b2a4c4ced67be79bf6d529cbb42da0 100644 (file)
@@ -388,23 +388,9 @@ static NTSTATUS cli_readall_recv(struct tevent_req *req, ssize_t *received,
        return NT_STATUS_OK;
 }
 
-struct cli_pull_subreq {
-       struct tevent_req *req;
-       ssize_t received;
-       uint8_t *buf;
-};
-
-/*
- * Parallel read support.
- *
- * cli_pull sends as many read&x requests as the server would allow via
- * max_mux at a time. When replies flow back in, the data is written into
- * the callback function "sink" in the right order.
- */
+struct cli_pull_chunk;
 
 struct cli_pull_state {
-       struct tevent_req *req;
-
        struct tevent_context *ev;
        struct cli_state *cli;
        uint16_t fnum;
@@ -415,54 +401,49 @@ struct cli_pull_state {
        void *priv;
 
        size_t chunk_size;
+       off_t next_offset;
+       off_t remaining;
 
        /*
-        * Outstanding requests
-        */
-       uint16_t max_reqs;
-       int num_reqs;
-       struct cli_pull_subreq *reqs;
-
-       /*
-        * For how many bytes did we send requests already?
-        */
-       off_t requested;
-
-       /*
-        * Next request index to push into "sink". This walks around the "req"
-        * array, taking care that the requests are pushed to "sink" in the
-        * right order. If necessary (i.e. replies don't come in in the right
-        * order), replies are held back in "reqs".
+        * How many bytes did we push into "sink"?
         */
-       int top_req;
+       off_t pushed;
 
        /*
-        * How many bytes did we push into "sink"?
+        * Outstanding requests
+        *
+        * The maximum is 256:
+        * - which would be a window of 256 MByte
+        *   for SMB2 with multi-credit
+        *   or smb1 unix extentions.
         */
-
-       off_t pushed;
+       uint16_t max_chunks;
+       uint16_t num_chunks;
+       uint16_t num_waiting;
+       struct cli_pull_chunk *chunks;
 };
 
-static char *cli_pull_print(struct tevent_req *req, TALLOC_CTX *mem_ctx)
-{
-       struct cli_pull_state *state = tevent_req_data(
-               req, struct cli_pull_state);
-       char *result;
-
-       result = tevent_req_default_print(req, mem_ctx);
-       if (result == NULL) {
-               return NULL;
-       }
-
-       return talloc_asprintf_append_buffer(
-               result, "num_reqs=%d, top_req=%d",
-               state->num_reqs, state->top_req);
-}
+struct cli_pull_chunk {
+       struct cli_pull_chunk *prev, *next;
+       struct tevent_req *req;/* This is the main request! Not the subreq */
+       struct tevent_req *subreq;
+       off_t ofs;
+       uint8_t *buf;
+       size_t total_size;
+       size_t tmp_size;
+       bool done;
+};
 
-static void cli_pull_read_done(struct tevent_req *read_req);
+static void cli_pull_setup_chunks(struct tevent_req *req);
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk);
+static void cli_pull_chunk_done(struct tevent_req *subreq);
 
 /*
- * Prepare an async pull request
+ * Parallel read support.
+ *
+ * cli_pull sends as many read&x requests as the server would allow via
+ * max_mux at a time. When replies flow back in, the data is written into
+ * the callback function "sink" in the right order.
  */
 
 struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
@@ -476,16 +457,13 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct cli_pull_state *state;
-       int i;
        size_t page_size = 1024;
+       uint64_t tmp64;
 
        req = tevent_req_create(mem_ctx, &state, struct cli_pull_state);
        if (req == NULL) {
                return NULL;
        }
-       tevent_req_set_print_fn(req, cli_pull_print);
-       state->req = req;
-
        state->cli = cli;
        state->ev = ev;
        state->fnum = fnum;
@@ -493,9 +471,8 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
        state->size = size;
        state->sink = sink;
        state->priv = priv;
-
-       state->pushed = 0;
-       state->top_req = 0;
+       state->next_offset = start_offset;
+       state->remaining = size;
 
        if (size == 0) {
                tevent_req_done(req);
@@ -507,155 +484,251 @@ struct tevent_req *cli_pull_send(TALLOC_CTX *mem_ctx,
                state->chunk_size &= ~(page_size - 1);
        }
 
-       state->max_reqs = smbXcli_conn_max_requests(cli->conn);
+       if (window_size == 0) {
+               /*
+                * We use 16 MByte as default window size.
+                */
+               window_size = 16 * 1024 * 1024;
+       }
+
+       tmp64 = window_size/state->chunk_size;
+       if ((window_size % state->chunk_size) > 0) {
+               tmp64 += 1;
+       }
+       tmp64 = MAX(tmp64, 1);
+       tmp64 = MIN(tmp64, 256);
+       state->max_chunks = tmp64;
+
+       /*
+        * We defer the callback because of the complex
+        * substate/subfunction logic
+        */
+       tevent_req_defer_callback(req, ev);
+
+       cli_pull_setup_chunks(req);
+       if (!tevent_req_is_in_progress(req)) {
+               return tevent_req_post(req, ev);
+       }
+
+       return req;
+}
 
-       state->num_reqs = MAX(window_size/state->chunk_size, 1);
-       state->num_reqs = MIN(state->num_reqs, state->max_reqs);
+static void cli_pull_setup_chunks(struct tevent_req *req)
+{
+       struct cli_pull_state *state =
+               tevent_req_data(req,
+               struct cli_pull_state);
+       struct cli_pull_chunk *chunk, *next = NULL;
+       size_t i;
 
-       state->reqs = talloc_zero_array(state, struct cli_pull_subreq,
-                                       state->num_reqs);
-       if (state->reqs == NULL) {
-               goto failed;
+       for (chunk = state->chunks; chunk; chunk = next) {
+               /*
+                * Note that chunk might be removed from this call.
+                */
+               next = chunk->next;
+               cli_pull_chunk_ship(chunk);
+               if (!tevent_req_is_in_progress(req)) {
+                       return;
+               }
        }
 
-       state->requested = 0;
+       for (i = state->num_chunks; i < state->max_chunks; i++) {
 
-       for (i=0; i<state->num_reqs; i++) {
-               struct cli_pull_subreq *subreq = &state->reqs[i];
-               off_t size_left;
-               size_t request_thistime;
+               if (state->num_waiting > 0) {
+                       return;
+               }
 
-               if (state->requested >= size) {
-                       state->num_reqs = i;
+               if (state->remaining == 0) {
                        break;
                }
 
-               size_left = size - state->requested;
-               request_thistime = MIN(size_left, state->chunk_size);
+               chunk = talloc_zero(state, struct cli_pull_chunk);
+               if (tevent_req_nomem(chunk, req)) {
+                       return;
+               }
+               chunk->req = req;
+               chunk->ofs = state->next_offset;
+               chunk->total_size = MIN(state->remaining, state->chunk_size);
+               state->next_offset += chunk->total_size;
+               state->remaining -= chunk->total_size;
 
-               subreq->req = cli_readall_send(
-                       state->reqs, ev, cli, fnum,
-                       state->start_offset + state->requested,
-                       request_thistime);
+               DLIST_ADD_END(state->chunks, chunk, NULL);
+               state->num_chunks++;
+               state->num_waiting++;
 
-               if (subreq->req == NULL) {
-                       goto failed;
+               cli_pull_chunk_ship(chunk);
+               if (!tevent_req_is_in_progress(req)) {
+                       return;
                }
-               tevent_req_set_callback(subreq->req, cli_pull_read_done, req);
-               state->requested += request_thistime;
        }
-       return req;
 
-failed:
-       TALLOC_FREE(req);
-       return NULL;
-}
+       if (state->remaining > 0) {
+               return;
+       }
 
-/*
- * Handle incoming read replies, push the data into sink and send out new
- * requests if necessary.
- */
+       if (state->num_chunks > 0) {
+               return;
+       }
+
+       tevent_req_done(req);
+}
 
-static void cli_pull_read_done(struct tevent_req *subreq)
+static void cli_pull_chunk_ship(struct cli_pull_chunk *chunk)
 {
-       struct tevent_req *req = tevent_req_callback_data(
-               subreq, struct tevent_req);
-       struct cli_pull_state *state = tevent_req_data(
-               req, struct cli_pull_state);
-       struct cli_pull_subreq *pull_subreq = NULL;
-       NTSTATUS status;
-       int i;
+       struct tevent_req *req = chunk->req;
+       struct cli_pull_state *state =
+               tevent_req_data(req,
+               struct cli_pull_state);
+       bool ok;
+       off_t ofs;
+       size_t size;
 
-       for (i = 0; i < state->num_reqs; i++) {
-               pull_subreq = &state->reqs[i];
-               if (subreq == pull_subreq->req) {
-                       break;
+       if (chunk->done) {
+               NTSTATUS status;
+
+               if (chunk != state->chunks) {
+                       /*
+                        * this chunk is not the
+                        * first one in the list.
+                        *
+                        * which means we should not
+                        * push it into the sink yet.
+                        */
+                       return;
+               }
+
+               if (chunk->tmp_size == 0) {
+                       /*
+                        * we git a short read, we're done
+                        */
+                       tevent_req_done(req);
+                       return;
                }
+
+               status = state->sink((char *)chunk->buf,
+                                    chunk->tmp_size,
+                                    state->priv);
+               if (tevent_req_nterror(req, status)) {
+                       return;
+               }
+               state->pushed += chunk->tmp_size;
+
+               if (chunk->tmp_size < chunk->total_size) {
+                       /*
+                        * we git a short read, we're done
+                        */
+                       tevent_req_done(req);
+                       return;
+               }
+
+               DLIST_REMOVE(state->chunks, chunk);
+               SMB_ASSERT(state->num_chunks > 0);
+               state->num_chunks--;
+               TALLOC_FREE(chunk);
+
+               return;
        }
-       if (i == state->num_reqs) {
-               /* Huh -- received something we did not send?? */
-               tevent_req_nterror(req, NT_STATUS_INTERNAL_ERROR);
+
+       if (chunk->subreq != NULL) {
                return;
        }
 
-       status = cli_readall_recv(subreq, &pull_subreq->received,
-                                 &pull_subreq->buf);
-       if (!NT_STATUS_IS_OK(status)) {
-               tevent_req_nterror(state->req, status);
+       SMB_ASSERT(state->num_waiting > 0);
+
+       ofs = chunk->ofs + chunk->tmp_size;
+       size = chunk->total_size - chunk->tmp_size;
+
+       ok = smb1cli_conn_req_possible(state->cli->conn);
+       if (!ok) {
                return;
        }
 
-       /*
-        * This loop is the one to take care of out-of-order replies. All
-        * pending requests are in state->reqs, state->reqs[top_req] is the
-        * one that is to be pushed next. If however a request later than
-        * top_req is replied to, then we can't push yet. If top_req is
-        * replied to at a later point then, we need to push all the finished
-        * requests.
-        */
+       chunk->subreq = cli_read_andx_send(chunk,
+                                          state->ev,
+                                          state->cli,
+                                          state->fnum,
+                                          ofs,
+                                          size);
+       if (tevent_req_nomem(chunk->subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(chunk->subreq,
+                               cli_pull_chunk_done,
+                               chunk);
 
-       while (state->reqs[state->top_req].req != NULL) {
-               struct cli_pull_subreq *top_subreq;
+       state->num_waiting--;
+       return;
+}
 
-               DEBUG(11, ("cli_pull_read_done: top_req = %d\n",
-                          state->top_req));
+static void cli_pull_chunk_done(struct tevent_req *subreq)
+{
+       struct cli_pull_chunk *chunk =
+               tevent_req_callback_data(subreq,
+               struct cli_pull_chunk);
+       struct tevent_req *req = chunk->req;
+       struct cli_pull_state *state =
+               tevent_req_data(req,
+               struct cli_pull_state);
+       NTSTATUS status;
+       size_t expected = chunk->total_size - chunk->tmp_size;
+       ssize_t received;
+       uint8_t *buf = NULL;
 
-               top_subreq = &state->reqs[state->top_req];
+       chunk->subreq = NULL;
 
-               if (tevent_req_is_in_progress(top_subreq->req)) {
-                       DEBUG(11, ("cli_pull_read_done: top request not yet "
-                                  "done\n"));
-                       return;
-               }
+       status = cli_read_andx_recv(subreq, &received, &buf);
+       if (NT_STATUS_EQUAL(status, NT_STATUS_END_OF_FILE)) {
+               received = 0;
+               status = NT_STATUS_OK;
+       }
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
+
+       if (received > expected) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
+       }
 
-               DEBUG(10, ("cli_pull_read_done: Pushing %d bytes, %d already "
-                          "pushed\n", (int)top_subreq->received,
-                          (int)state->pushed));
+       if (received == 0) {
+               /*
+                * We got EOF we're done
+                */
+               chunk->done = true;
+               cli_pull_setup_chunks(req);
+               return;
+       }
 
-               status = state->sink((char *)top_subreq->buf,
-                                    top_subreq->received, state->priv);
-               if (tevent_req_nterror(state->req, status)) {
+       if (received == chunk->total_size) {
+               /*
+                * We got it in the first run.
+                *
+                * We don't call TALLOC_FREE(subreq)
+                * here and keep the returned buffer.
+                */
+               chunk->buf = buf;
+       } else if (chunk->buf == NULL) {
+               chunk->buf = talloc_array(chunk, uint8_t, chunk->total_size);
+               if (tevent_req_nomem(chunk->buf, req)) {
                        return;
                }
-               state->pushed += top_subreq->received;
-
-               TALLOC_FREE(state->reqs[state->top_req].req);
-
-               if (state->requested < state->size) {
-                       struct tevent_req *new_req;
-                       off_t size_left;
-                       size_t request_thistime;
-
-                       size_left = state->size - state->requested;
-                       request_thistime = MIN(size_left, state->chunk_size);
-
-                       DEBUG(10, ("cli_pull_read_done: Requesting %d bytes "
-                                  "at %d, position %d\n",
-                                  (int)request_thistime,
-                                  (int)(state->start_offset
-                                        + state->requested),
-                                  state->top_req));
-
-                       new_req = cli_readall_send(
-                               state->reqs, state->ev, state->cli,
-                               state->fnum,
-                               state->start_offset + state->requested,
-                               request_thistime);
-
-                       if (tevent_req_nomem(new_req, state->req)) {
-                               return;
-                       }
-                       tevent_req_set_callback(new_req, cli_pull_read_done,
-                                               req);
-
-                       state->reqs[state->top_req].req = new_req;
-                       state->requested += request_thistime;
-               }
+       }
 
-               state->top_req = (state->top_req+1) % state->num_reqs;
+       if (received != chunk->total_size) {
+               uint8_t *p = chunk->buf + chunk->tmp_size;
+               memcpy(p, buf, received);
+               TALLOC_FREE(subreq);
        }
 
-       tevent_req_done(req);
+       chunk->tmp_size += received;
+
+       if (chunk->tmp_size == chunk->total_size) {
+               chunk->done = true;
+       } else {
+               state->num_waiting++;
+       }
+
+       cli_pull_setup_chunks(req);
 }
 
 NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received)
@@ -665,9 +738,11 @@ NTSTATUS cli_pull_recv(struct tevent_req *req, off_t *received)
        NTSTATUS status;
 
        if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
                return status;
        }
        *received = state->pushed;
+       tevent_req_received(req);
        return NT_STATUS_OK;
 }