s3-rpc: convert process_complete_pdu and callers async
authorDavid Disseldorp <ddiss@samba.org>
Wed, 25 Apr 2012 13:15:55 +0000 (15:15 +0200)
committerDavid Disseldorp <ddiss@samba.org>
Mon, 15 Apr 2013 16:15:17 +0000 (18:15 +0200)
source3/rpc_server/rpc_server.c
source3/rpc_server/rpc_server.h
source3/rpc_server/srv_pipe.c
source3/rpc_server/srv_pipe_hnd.c

index d64b13722b3396a1c4b0e324c68650ed11a93a73..33bdff1b7712e229c6f9bbfe8a699e266ea50759 100644 (file)
@@ -355,6 +355,7 @@ void named_pipe_accept_function(struct tevent_context *ev_ctx,
 }
 
 static void named_pipe_packet_process(struct tevent_req *subreq);
+static void named_pipe_packet_respond(struct tevent_req *subreq);
 static void named_pipe_packet_done(struct tevent_req *subreq);
 
 static void named_pipe_accept_done(struct tevent_req *subreq)
@@ -421,16 +422,12 @@ static void named_pipe_packet_process(struct tevent_req *subreq)
 {
        struct named_pipe_client *npc =
                tevent_req_callback_data(subreq, struct named_pipe_client);
-       struct _output_data *out = &npc->p->out_data;
        DATA_BLOB recv_buffer = data_blob_null;
        struct ncacn_packet *pkt;
        NTSTATUS status;
        ssize_t data_left;
        ssize_t data_used;
        char *data;
-       uint32_t to_send;
-       size_t i;
-       bool ok;
 
        status = dcerpc_read_ncacn_packet_recv(subreq, npc, &pkt, &recv_buffer);
        TALLOC_FREE(subreq);
@@ -456,15 +453,67 @@ static void named_pipe_packet_process(struct tevent_req *subreq)
                data += data_used;
 
                if (pdu_ready == true) {
-                       process_complete_pdu(npc->p);
-                       break;
+                       /* XXX received PDUs are processed one at a time */
+                       if (data_left > 0) {
+                               DEBUG(0, ("Unacceptable, more than one PDU\n"));
+                               status = NT_STATUS_INTERNAL_ERROR;
+                               goto fail;
+                       }
+                       /* Do not leak this buffer, npc is a long lived context */
+                       talloc_free(recv_buffer.data);
+                       talloc_free(pkt);
+
+                       subreq = process_complete_pdu_send(npc->ev, npc->p);
+                       if (subreq == NULL) {
+                               status = NT_STATUS_NO_MEMORY;
+                               goto fail;
+                       }
+                       tevent_req_set_callback(subreq,
+                                               named_pipe_packet_respond,
+                                               npc);
+                       return;
                }
        }
 
+       DEBUG(0, ("No complete PDU to process\n"));
+
        /* Do not leak this buffer, npc is a long lived context */
        talloc_free(recv_buffer.data);
        talloc_free(pkt);
 
+       /* Wait for the next packet */
+       subreq = dcerpc_read_ncacn_packet_send(npc, npc->ev, npc->tstream);
+       if (!subreq) {
+               DEBUG(2, ("Failed to start receving packets\n"));
+               status = NT_STATUS_NO_MEMORY;
+               goto fail;
+       }
+       tevent_req_set_callback(subreq, named_pipe_packet_process, npc);
+       return;
+
+fail:
+       DEBUG(2, ("Fatal error(%s). "
+                 "Terminating client(%s) connection!\n",
+                 nt_errstr(status), npc->client_name));
+       /* terminate client connection */
+       talloc_free(npc);
+       return;
+}
+
+static void named_pipe_packet_respond(struct tevent_req *subreq)
+{
+       struct named_pipe_client *npc =
+               tevent_req_callback_data(subreq, struct named_pipe_client);
+       uint32_t to_send;
+       size_t i;
+       bool ok;
+       struct _output_data *out;
+       NTSTATUS status;
+
+       process_complete_pdu_recv(subreq);
+       TALLOC_FREE(subreq);
+
+       out = &npc->p->out_data;
        /* this is needed because of the way DCERPC Binds work in
         * the RPC marshalling code */
        to_send = out->frag.length - out->current_pdu_sent;
@@ -948,6 +997,7 @@ struct dcerpc_ncacn_conn {
 };
 
 static void dcerpc_ncacn_packet_process(struct tevent_req *subreq);
+static void dcerpc_ncacn_packet_respond(struct tevent_req *subreq);
 static void dcerpc_ncacn_packet_done(struct tevent_req *subreq);
 
 void dcerpc_ncacn_accept(struct tevent_context *ev_ctx,
@@ -1130,12 +1180,10 @@ static void dcerpc_ncacn_packet_process(struct tevent_req *subreq)
        struct dcerpc_ncacn_conn *ncacn_conn =
                tevent_req_callback_data(subreq, struct dcerpc_ncacn_conn);
 
-       struct _output_data *out = &ncacn_conn->p->out_data;
        DATA_BLOB recv_buffer = data_blob_null;
        struct ncacn_packet *pkt;
        ssize_t data_left;
        ssize_t data_used;
-       uint32_t to_send;
        char *data;
        NTSTATUS status;
        bool ok;
@@ -1170,8 +1218,25 @@ static void dcerpc_ncacn_packet_process(struct tevent_req *subreq)
                data += data_used;
 
                if (pdu_ready == true) {
-                       process_complete_pdu(ncacn_conn->p);
-                       break;
+                       /* XXX received PDUs are processed one at a time */
+                       if (data_left > 0) {
+                               DEBUG(0, ("Unacceptable, more than one PDU\n"));
+                               status = NT_STATUS_INTERNAL_ERROR;
+                               goto fail;
+                       }
+                       talloc_free(recv_buffer.data);
+                       talloc_free(pkt);
+
+                       subreq = process_complete_pdu_send(ncacn_conn->ev_ctx,
+                                                          ncacn_conn->p);
+                       if (subreq == NULL) {
+                               status = NT_STATUS_NO_MEMORY;
+                               goto fail;
+                       }
+                       tevent_req_set_callback(subreq,
+                                               dcerpc_ncacn_packet_respond,
+                                               ncacn_conn);
+                       return;
                }
        }
 
@@ -1179,6 +1244,40 @@ static void dcerpc_ncacn_packet_process(struct tevent_req *subreq)
        talloc_free(recv_buffer.data);
        talloc_free(pkt);
 
+       /* Wait for the next packet */
+       subreq = dcerpc_read_ncacn_packet_send(ncacn_conn,
+                                              ncacn_conn->ev_ctx,
+                                              ncacn_conn->tstream);
+       if (subreq == NULL) {
+               DEBUG(2, ("Failed to start receving packets\n"));
+               status = NT_STATUS_NO_MEMORY;
+               goto fail;
+       }
+       tevent_req_set_callback(subreq, dcerpc_ncacn_packet_process, ncacn_conn);
+       return;
+
+fail:
+       DEBUG(3, ("Terminating client(%s) connection! - '%s'\n",
+                 ncacn_conn->client_name, nt_errstr(status)));
+
+       /* Terminate client connection */
+       talloc_free(ncacn_conn);
+       return;
+}
+
+void dcerpc_ncacn_packet_respond(struct tevent_req *subreq)
+{
+       struct dcerpc_ncacn_conn *ncacn_conn =
+               tevent_req_callback_data(subreq, struct dcerpc_ncacn_conn);
+
+       struct _output_data *out;
+       uint32_t to_send;
+       NTSTATUS status;
+
+       process_complete_pdu_recv(subreq);
+       TALLOC_FREE(subreq);
+
+       out = &ncacn_conn->p->out_data;
        /*
         * This is needed because of the way DCERPC binds work in the RPC
         * marshalling code
@@ -1213,7 +1312,7 @@ static void dcerpc_ncacn_packet_process(struct tevent_req *subreq)
         * a full request, and need to wait for more data from the client
         */
        while (out->data_sent_length < out->rdata.length) {
-               ok = create_next_pdu(ncacn_conn->p);
+               bool ok = create_next_pdu(ncacn_conn->p);
                if (!ok) {
                        DEBUG(3, ("Failed to create next PDU!\n"));
                        status = NT_STATUS_UNEXPECTED_IO_ERROR;
index 308354d834772b1ea462960e03d3e4f97581c888..73575093cb202ed7fd95e2c337bfcb87206fdecb 100644 (file)
@@ -26,7 +26,9 @@ typedef bool (*dcerpc_ncacn_disconnect_fn)(struct pipes_struct *p);
 typedef void (named_pipe_termination_fn)(void *private_data);
 
 void set_incoming_fault(struct pipes_struct *p);
-void process_complete_pdu(struct pipes_struct *p);
+struct tevent_req *process_complete_pdu_send(struct tevent_context *ev,
+                                            struct pipes_struct *p);
+void process_complete_pdu_recv(struct tevent_req *req);
 int create_named_pipe_socket(const char *pipe_name);
 bool setup_named_pipe_socket(const char *pipe_name,
                             struct tevent_context *ev_ctx,
index 4a7f4ab8c462b449b1d07b26c12dd8fb8cb0bc1e..0883d36145b7074904b623758d22795ad0291081 100644 (file)
@@ -1579,25 +1579,41 @@ static bool process_request_pdu(struct pipes_struct *p, struct ncacn_packet *pkt
        return True;
 }
 
+struct process_complete_pdu_state {
+       struct pipes_struct *p;
+       bool reply;
+       NTSTATUS status;
+};
 /****************************************************************************
  Processes a finished PDU stored in p->in_data.pdu.
 ****************************************************************************/
 
-void process_complete_pdu(struct pipes_struct *p)
+struct tevent_req *process_complete_pdu_send(struct tevent_context *ev,
+                                            struct pipes_struct *p)
 {
-       struct ncacn_packet *pkt = NULL;
+       struct tevent_req *req;
+       struct ncacn_packet *pkt;
        NTSTATUS status;
-       bool reply = False;
+       struct process_complete_pdu_state *state;
+
+       req = tevent_req_create(p->mem_ctx, &state,
+                               struct process_complete_pdu_state);
+       if (req == NULL) {
+               DEBUG(0, ("Out of memory!\n"));
+               return NULL;
+       }
+       state->p = p;
+       state->reply = false;
 
-       if(p->fault_state) {
+       if (p->fault_state) {
                DEBUG(10,("RPC connection in fault state.\n"));
-               goto done;
+               tevent_req_done(req);
+               return tevent_req_post(req, ev);
        }
 
-       pkt = talloc(p->mem_ctx, struct ncacn_packet);
-       if (!pkt) {
-               DEBUG(0, ("Out of memory!\n"));
-               goto done;
+       pkt = talloc(state, struct ncacn_packet);
+       if (tevent_req_nomem(pkt, req)) {
+               return tevent_req_post(req, ev);
        }
 
        /*
@@ -1611,12 +1627,13 @@ void process_complete_pdu(struct pipes_struct *p)
        }
        DEBUG(10, ("PDU is in %s Endian format!\n", p->endian?"Big":"Little"));
 
-       status = dcerpc_pull_ncacn_packet(pkt, &p->in_data.pdu,
+       status = dcerpc_pull_ncacn_packet(state, &p->in_data.pdu,
                                          pkt, p->endian);
        if (!NT_STATUS_IS_OK(status)) {
                DEBUG(0, ("Failed to unmarshal rpc packet: %s!\n",
                          nt_errstr(status)));
-               goto done;
+               tevent_req_done(req);
+               return tevent_req_post(req, ev);
        }
 
        /* Store the call_id */
@@ -1626,7 +1643,7 @@ void process_complete_pdu(struct pipes_struct *p)
 
        switch (pkt->ptype) {
        case DCERPC_PKT_REQUEST:
-               reply = process_request_pdu(p, pkt);
+               state->reply = process_request_pdu(p, pkt);
                break;
 
        case DCERPC_PKT_PING: /* CL request - ignore... */
@@ -1657,7 +1674,7 @@ void process_complete_pdu(struct pipes_struct *p)
                 * We assume that a pipe bind is only in one pdu.
                 */
                if (pipe_init_outgoing_data(p)) {
-                       reply = api_pipe_bind_req(p, pkt);
+                       state->reply = api_pipe_bind_req(p, pkt);
                }
                break;
 
@@ -1674,7 +1691,7 @@ void process_complete_pdu(struct pipes_struct *p)
                 * We assume that a pipe bind is only in one pdu.
                 */
                if (pipe_init_outgoing_data(p)) {
-                       reply = api_pipe_alter_context(p, pkt);
+                       state->reply = api_pipe_alter_context(p, pkt);
                }
                break;
 
@@ -1688,7 +1705,7 @@ void process_complete_pdu(struct pipes_struct *p)
                 * The third packet in an auth exchange.
                 */
                if (pipe_init_outgoing_data(p)) {
-                       reply = api_pipe_bind_auth3(p, pkt);
+                       state->reply = api_pipe_bind_auth3(p, pkt);
                }
                break;
 
@@ -1707,16 +1724,17 @@ void process_complete_pdu(struct pipes_struct *p)
                 * If we ever did we'd have to send a cancel_ack reply.
                 * For now, just free all client data and continue
                 * processing. */
-               reply = True;
+               state->reply = true;
                break;
 
+               /* TODO */
 #if 0
                /* Enable this if we're doing async rpc. */
                /* We must check the outstanding callid matches. */
                if (pipe_init_outgoing_data(p)) {
                        /* Send a cancel_ack PDU reply. */
                        /* We should probably check the auth-verifier here. */
-                       reply = setup_cancel_ack_reply(p, pkt);
+                       state->reply = setup_cancel_ack_reply(p, pkt);
                }
                break;
 #endif
@@ -1727,7 +1745,7 @@ void process_complete_pdu(struct pipes_struct *p)
                 * processing. */
                DEBUG(3, ("process_complete_pdu: DCERPC_PKT_ORPHANED."
                          " Abandoning rpc call.\n"));
-               reply = True;
+               state->reply = true;
                break;
 
        default:
@@ -1737,21 +1755,26 @@ void process_complete_pdu(struct pipes_struct *p)
                break;
        }
 
-done:
-       if (!reply) {
+       tevent_req_done(req);
+       return tevent_req_post(req, ev);
+}
+
+void process_complete_pdu_recv(struct tevent_req *req)
+{
+       struct process_complete_pdu_state *state
+               = tevent_req_data(req, struct process_complete_pdu_state);
+
+       if (!state->reply) {
                DEBUG(3,("DCE/RPC fault sent!"));
-               set_incoming_fault(p);
-               setup_fault_pdu(p, NT_STATUS(DCERPC_FAULT_OP_RNG_ERROR));
-               TALLOC_FREE(pkt);
+               set_incoming_fault(state->p);
+               setup_fault_pdu(state->p, NT_STATUS(DCERPC_FAULT_OP_RNG_ERROR));
        } else {
                /*
                 * Reset the lengths. We're ready for a new pdu.
                 */
-               TALLOC_FREE(p->in_data.pdu.data);
-               p->in_data.pdu_needed_len = 0;
-               p->in_data.pdu.length = 0;
+               TALLOC_FREE(state->p->in_data.pdu.data);
+               state->p->in_data.pdu_needed_len = 0;
+               state->p->in_data.pdu.length = 0;
        }
-
-       TALLOC_FREE(pkt);
+       tevent_req_received(req);
 }
-
index 937e81beb8dd1c848820a0b0e2b785e090013b2b..62cd8762a8a46a0aead55c470dc5df27b8545c88 100644 (file)
@@ -231,13 +231,28 @@ ssize_t process_incoming_data(struct pipes_struct *p, const char *data,
        return (ssize_t)data_to_copy;
 }
 
+struct write_to_internal_pipe_state {
+       size_t data_processed;
+};
+static void write_to_internal_pipe_done(struct tevent_req *subreq);
+
 /****************************************************************************
  Accepts incoming data on an internal rpc pipe.
 ****************************************************************************/
 
-static ssize_t write_to_internal_pipe(struct pipes_struct *p, const char *data, size_t n)
+static struct tevent_req *write_to_internal_pipe_send(TALLOC_CTX *mem_ctx,
+                                                     struct tevent_context *ev,
+                                                     struct pipes_struct *p,
+                                                     struct iovec *buf)
 {
-       size_t data_left = n;
+       const char *data = buf->iov_base;
+       size_t data_left = buf->iov_len;
+       struct write_to_internal_pipe_state *state;
+       struct tevent_req *req = tevent_req_create(mem_ctx, &state,
+                                       struct write_to_internal_pipe_state);
+       if (req == NULL) {
+               return NULL;
+       }
 
        while(data_left) {
                ssize_t data_used;
@@ -251,20 +266,65 @@ static ssize_t write_to_internal_pipe(struct pipes_struct *p, const char *data,
                DEBUG(10, ("write_to_pipe: data_used = %d\n",
                           (int)data_used));
 
-               if(data_used < 0) {
-                       return -1;
+               if (data_used < 0) {
+                       tevent_req_nterror(req, NT_STATUS_UNEXPECTED_IO_ERROR);
+                       return tevent_req_post(req, ev);
                }
 
                data_left -= data_used;
                data += data_used;
 
                if (pdu_ready == true) {
-                       process_complete_pdu(p);
-                       break;
+                       struct tevent_req *subreq;
+                       /* XXX received PDUs are processed one at a time */
+                       if (data_left > 0) {
+                               DEBUG(1, ("Unacceptable, more than one PDU\n"));
+                               tevent_req_nterror(req,
+                                               NT_STATUS_UNEXPECTED_IO_ERROR);
+                               return tevent_req_post(req, ev);
+                       }
+                       subreq = process_complete_pdu_send(ev, p);
+                       if (tevent_req_nomem(subreq, req)) {
+                               return tevent_req_post(req, ev);
+                       }
+                       state->data_processed = buf->iov_len;
+                       tevent_req_set_callback(subreq,
+                                               write_to_internal_pipe_done,
+                                               req);
+                       return req;
                }
        }
 
-       return n;
+       state->data_processed = buf->iov_len;
+       tevent_req_done(req);
+       return tevent_req_post(req, ev);
+}
+
+static void write_to_internal_pipe_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req =
+               tevent_req_callback_data(subreq, struct tevent_req);
+
+       process_complete_pdu_recv(subreq);
+       /* data_processed already set, p in fault state on error */
+       tevent_req_done(req);
+}
+
+static NTSTATUS write_to_internal_pipe_recv(struct tevent_req *req,
+                                           ssize_t *data_processed)
+{
+       struct write_to_internal_pipe_state *state
+               = tevent_req_data(req, struct write_to_internal_pipe_state);
+       NTSTATUS status;
+
+       if (tevent_req_is_nterror(req, &status)) {
+               tevent_req_received(req);
+               return status;
+       }
+
+       *data_processed = state->data_processed;
+       tevent_req_received(req);
+       return NT_STATUS_OK;
 }
 
 /****************************************************************************
@@ -511,9 +571,13 @@ bool np_read_in_progress(struct fake_file_handle *handle)
 
 struct np_write_state {
        struct event_context *ev;
-       struct np_proxy_state *p;
        struct iovec iov;
        ssize_t nwritten;
+       enum FAKE_FILE_TYPE type;
+       union {
+               struct pipes_struct *ps;
+               struct np_proxy_state *np;
+       };
 };
 
 static void np_write_done(struct tevent_req *subreq);
@@ -540,30 +604,32 @@ struct tevent_req *np_write_send(TALLOC_CTX *mem_ctx, struct event_context *ev,
                goto post_status;
        }
 
+       state->ev = ev;
+       state->type = handle->type;
+       state->iov.iov_base = discard_const_p(void, data);
+       state->iov.iov_len = len;
        if (handle->type == FAKE_FILE_TYPE_NAMED_PIPE) {
-               struct pipes_struct *p = talloc_get_type_abort(
+               struct tevent_req *subreq;
+               state->ps = talloc_get_type_abort(
                        handle->private_data, struct pipes_struct);
 
-               state->nwritten = write_to_internal_pipe(p, (const char *)data, len);
-
-               status = (state->nwritten >= 0)
-                       ? NT_STATUS_OK : NT_STATUS_UNEXPECTED_IO_ERROR;
-               goto post_status;
+               subreq = write_to_internal_pipe_send(state, ev, state->ps,
+                                                    &state->iov);
+               if (subreq == NULL) {
+                       goto fail;
+               }
+               tevent_req_set_callback(subreq, np_write_done, req);
+               return req;
        }
 
        if (handle->type == FAKE_FILE_TYPE_NAMED_PIPE_PROXY) {
-               struct np_proxy_state *p = talloc_get_type_abort(
-                       handle->private_data, struct np_proxy_state);
                struct tevent_req *subreq;
-
-               state->ev = ev;
-               state->p = p;
-               state->iov.iov_base = discard_const_p(void, data);
-               state->iov.iov_len = len;
+               state->np = talloc_get_type_abort(
+                       handle->private_data, struct np_proxy_state);
 
                subreq = tstream_writev_queue_send(state, ev,
-                                                  p->npipe,
-                                                  p->write_queue,
+                                                  state->np->npipe,
+                                                  state->np->write_queue,
                                                   &state->iov, 1);
                if (subreq == NULL) {
                        goto fail;
@@ -592,12 +658,21 @@ static void np_write_done(struct tevent_req *subreq)
        struct np_write_state *state = tevent_req_data(
                req, struct np_write_state);
        ssize_t received;
-       int err;
 
-       received = tstream_writev_queue_recv(subreq, &err);
-       if (received < 0) {
-               tevent_req_nterror(req, map_nt_error_from_unix(err));
-               return;
+       if (state->type == FAKE_FILE_TYPE_NAMED_PIPE) {
+               NTSTATUS status;
+               status = write_to_internal_pipe_recv(subreq, &received);
+               if (tevent_req_nterror(req, status)) {
+                       return;
+               }
+       }
+       if (state->type == FAKE_FILE_TYPE_NAMED_PIPE_PROXY) {
+               int err;
+               received = tstream_writev_queue_recv(subreq, &err);
+               if (received < 0) {
+                       tevent_req_nterror(req, map_nt_error_from_unix(err));
+                       return;
+               }
        }
        state->nwritten = received;
        tevent_req_done(req);