s3:libsmb: rewrite cli_push* to use smb1cli_conn_req_possible()
authorStefan Metzmacher <metze@samba.org>
Tue, 13 Aug 2013 12:10:59 +0000 (14:10 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 15 Aug 2013 07:07:06 +0000 (09:07 +0200)
This works out if it's possible to ship the next request dynamically
instead of relying on fixed values.

The default window size is 16 MByte.

We limit the number of outstanding chunks/requests to 256.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/libsmb/clireadwrite.c

index 47e7f1bfd4c65880df772d3f68397fab80715733..550c52b15e0690b9d03c100c304f81f72b8a9379 100644 (file)
@@ -1092,13 +1092,7 @@ NTSTATUS cli_writeall(struct cli_state *cli, uint16_t fnum, uint16_t mode,
        return status;
 }
 
-struct cli_push_write_state {
-       struct tevent_req *req;/* This is the main request! Not the subreq */
-       uint32_t idx;
-       off_t ofs;
-       uint8_t *buf;
-       size_t size;
-};
+struct cli_push_chunk;
 
 struct cli_push_state {
        struct tevent_context *ev;
@@ -1106,7 +1100,6 @@ struct cli_push_state {
        uint16_t fnum;
        uint16_t mode;
        off_t start_offset;
-       size_t window_size;
 
        size_t (*source)(uint8_t *buf, size_t n, void *priv);
        void *priv;
@@ -1118,62 +1111,32 @@ struct cli_push_state {
 
        /*
         * Outstanding requests
+        *
+        * The maximum is 256:
+        * - which would be a window of 256 MByte
+        *   for SMB2 with multi-credit
+        *   or smb1 unix extentions.
         */
-       uint32_t pending;
-       uint16_t max_reqs;
-       uint32_t num_reqs;
-       struct cli_push_write_state **reqs;
+       uint16_t max_chunks;
+       uint16_t num_chunks;
+       uint16_t num_waiting;
+       struct cli_push_chunk *chunks;
 };
 
-static void cli_push_written(struct tevent_req *req);
-
-static bool cli_push_write_setup(struct tevent_req *req,
-                                struct cli_push_state *state,
-                                uint32_t idx)
-{
-       struct cli_push_write_state *substate;
+struct cli_push_chunk {
+       struct cli_push_chunk *prev, *next;
+       struct tevent_req *req;/* This is the main request! Not the subreq */
        struct tevent_req *subreq;
+       off_t ofs;
+       uint8_t *buf;
+       size_t total_size;
+       size_t tmp_size;
+       bool done;
+};
 
-       substate = talloc(state->reqs, struct cli_push_write_state);
-       if (!substate) {
-               return false;
-       }
-       substate->req = req;
-       substate->idx = idx;
-       substate->ofs = state->next_offset;
-       substate->buf = talloc_array(substate, uint8_t, state->chunk_size);
-       if (!substate->buf) {
-               talloc_free(substate);
-               return false;
-       }
-       substate->size = state->source(substate->buf,
-                                      state->chunk_size,
-                                      state->priv);
-       if (substate->size == 0) {
-               state->eof = true;
-               /* nothing to send */
-               talloc_free(substate);
-               return true;
-       }
-
-       subreq = cli_writeall_send(substate,
-                                  state->ev, state->cli,
-                                  state->fnum, state->mode,
-                                  substate->buf,
-                                  substate->ofs,
-                                  substate->size);
-       if (!subreq) {
-               talloc_free(substate);
-               return false;
-       }
-       tevent_req_set_callback(subreq, cli_push_written, substate);
-
-       state->reqs[idx] = substate;
-       state->pending += 1;
-       state->next_offset += substate->size;
-
-       return true;
-}
+static void cli_push_setup_chunks(struct tevent_req *req);
+static void cli_push_chunk_ship(struct cli_push_chunk *chunk);
+static void cli_push_chunk_done(struct tevent_req *subreq);
 
 struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                                 struct cli_state *cli,
@@ -1185,8 +1148,8 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
 {
        struct tevent_req *req;
        struct cli_push_state *state;
-       uint32_t i;
        size_t page_size = 1024;
+       uint64_t tmp64;
 
        req = tevent_req_create(mem_ctx, &state, struct cli_push_state);
        if (req == NULL) {
@@ -1199,8 +1162,6 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
        state->mode = mode;
        state->source = source;
        state->priv = priv;
-       state->eof = false;
-       state->pending = 0;
        state->next_offset = start_offset;
 
        state->chunk_size = cli_write_max_bufsize(cli, mode, 14);
@@ -1208,77 +1169,202 @@ struct tevent_req *cli_push_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
                state->chunk_size &= ~(page_size - 1);
        }
 
-       state->max_reqs = smbXcli_conn_max_requests(cli->conn);
-
        if (window_size == 0) {
-               window_size = state->max_reqs * state->chunk_size;
+               /*
+                * We use 16 MByte as default window size.
+                */
+               window_size = 16 * 1024 * 1024;
        }
-       state->num_reqs = window_size/state->chunk_size;
+
+       tmp64 = window_size/state->chunk_size;
        if ((window_size % state->chunk_size) > 0) {
-               state->num_reqs += 1;
+               tmp64 += 1;
        }
-       state->num_reqs = MIN(state->num_reqs, state->max_reqs);
-       state->num_reqs = MAX(state->num_reqs, 1);
+       tmp64 = MAX(tmp64, 1);
+       tmp64 = MIN(tmp64, 256);
+       state->max_chunks = tmp64;
 
-       state->reqs = talloc_zero_array(state, struct cli_push_write_state *,
-                                       state->num_reqs);
-       if (state->reqs == NULL) {
-               goto failed;
+       /*
+        * We defer the callback because of the complex
+        * substate/subfunction logic
+        */
+       tevent_req_defer_callback(req, ev);
+
+       cli_push_setup_chunks(req);
+       if (!tevent_req_is_in_progress(req)) {
+               return tevent_req_post(req, ev);
        }
 
-       for (i=0; i<state->num_reqs; i++) {
-               if (!cli_push_write_setup(req, state, i)) {
-                       goto failed;
+       return req;
+}
+
+static void cli_push_setup_chunks(struct tevent_req *req)
+{
+       struct cli_push_state *state =
+               tevent_req_data(req,
+               struct cli_push_state);
+       struct cli_push_chunk *chunk, *next = NULL;
+       size_t i;
+
+       for (chunk = state->chunks; chunk; chunk = next) {
+               /*
+                * Note that chunk might be removed from this call.
+                */
+               next = chunk->next;
+               cli_push_chunk_ship(chunk);
+               if (!tevent_req_is_in_progress(req)) {
+                       return;
+               }
+       }
+
+       for (i = state->num_chunks; i < state->max_chunks; i++) {
+
+               if (state->num_waiting > 0) {
+                       return;
                }
 
                if (state->eof) {
                        break;
                }
+
+               chunk = talloc_zero(state, struct cli_push_chunk);
+               if (tevent_req_nomem(chunk, req)) {
+                       return;
+               }
+               chunk->req = req;
+               chunk->ofs = state->next_offset;
+               chunk->buf = talloc_array(chunk,
+                                         uint8_t,
+                                         state->chunk_size);
+               if (tevent_req_nomem(chunk->buf, req)) {
+                       return;
+               }
+               chunk->total_size = state->source(chunk->buf,
+                                                 state->chunk_size,
+                                                 state->priv);
+               if (chunk->total_size == 0) {
+                       /* nothing to send */
+                       talloc_free(chunk);
+                       state->eof = true;
+                       break;
+               }
+               state->next_offset += chunk->total_size;
+
+               DLIST_ADD_END(state->chunks, chunk, NULL);
+               state->num_chunks++;
+               state->num_waiting++;
+
+               cli_push_chunk_ship(chunk);
+               if (!tevent_req_is_in_progress(req)) {
+                       return;
+               }
        }
 
-       if (state->pending == 0) {
-               tevent_req_done(req);
-               return tevent_req_post(req, ev);
+       if (!state->eof) {
+               return;
        }
 
-       return req;
+       if (state->num_chunks > 0) {
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+static void cli_push_chunk_ship(struct cli_push_chunk *chunk)
+{
+       struct tevent_req *req = chunk->req;
+       struct cli_push_state *state =
+               tevent_req_data(req,
+               struct cli_push_state);
+       bool ok;
+       const uint8_t *buf;
+       off_t ofs;
+       size_t size;
+
+       if (chunk->done) {
+               DLIST_REMOVE(state->chunks, chunk);
+               SMB_ASSERT(state->num_chunks > 0);
+               state->num_chunks--;
+               TALLOC_FREE(chunk);
+
+               return;
+       }
+
+       if (chunk->subreq != NULL) {
+               return;
+       }
+
+       SMB_ASSERT(state->num_waiting > 0);
+
+       buf = chunk->buf + chunk->tmp_size;
+       ofs = chunk->ofs + chunk->tmp_size;
+       size = chunk->total_size - chunk->tmp_size;
+
+       ok = smb1cli_conn_req_possible(state->cli->conn);
+       if (!ok) {
+               return;
+       }
+
+       chunk->subreq = cli_write_andx_send(chunk,
+                                           state->ev,
+                                           state->cli,
+                                           state->fnum,
+                                           state->mode,
+                                           buf,
+                                           ofs,
+                                           size);
+       if (tevent_req_nomem(chunk->subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(chunk->subreq,
+                               cli_push_chunk_done,
+                               chunk);
 
- failed:
-       tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
-       return tevent_req_post(req, ev);
+       state->num_waiting--;
+       return;
 }
 
-static void cli_push_written(struct tevent_req *subreq)
+static void cli_push_chunk_done(struct tevent_req *subreq)
 {
-       struct cli_push_write_state *substate = tevent_req_callback_data(
-               subreq, struct cli_push_write_state);
-       struct tevent_req *req = substate->req;
-       struct cli_push_state *state = tevent_req_data(
-               req, struct cli_push_state);
+       struct cli_push_chunk *chunk =
+               tevent_req_callback_data(subreq,
+               struct cli_push_chunk);
+       struct tevent_req *req = chunk->req;
+       struct cli_push_state *state =
+               tevent_req_data(req,
+               struct cli_push_state);
        NTSTATUS status;
-       uint32_t idx = substate->idx;
+       size_t expected = chunk->total_size - chunk->tmp_size;
+       size_t written;
 
-       state->reqs[idx] = NULL;
-       state->pending -= 1;
+       chunk->subreq = NULL;
 
-       status = cli_writeall_recv(subreq, NULL);
+       status = cli_write_andx_recv(subreq, &written);
        TALLOC_FREE(subreq);
-       TALLOC_FREE(substate);
        if (tevent_req_nterror(req, status)) {
                return;
        }
 
-       if (!state->eof) {
-               if (!cli_push_write_setup(req, state, idx)) {
-                       tevent_req_nterror(req, NT_STATUS_NO_MEMORY);
-                       return;
-               }
+       if (written > expected) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
+               return;
        }
 
-       if (state->pending == 0) {
-               tevent_req_done(req);
+       if (written == 0) {
+               tevent_req_nterror(req, NT_STATUS_INVALID_NETWORK_RESPONSE);
                return;
        }
+
+       chunk->tmp_size += written;
+
+       if (chunk->tmp_size == chunk->total_size) {
+               chunk->done = true;
+       } else {
+               state->num_waiting++;
+       }
+
+       cli_push_setup_chunks(req);
 }
 
 NTSTATUS cli_push_recv(struct tevent_req *req)