libcli/smb/smb_direct.c
authorStefan Metzmacher <metze@samba.org>
Wed, 28 Sep 2016 11:45:50 +0000 (13:45 +0200)
committerStefan Metzmacher <metze@samba.org>
Fri, 1 Jun 2018 12:35:05 +0000 (14:35 +0200)
libcli/smb/smb_direct.c
libcli/smb/smb_direct.h
libcli/smb/smb_direct_daemon.c

index 92c8feea232cd367f2657f796cc5255a19fd673b..ddd650cc816a2c22395ff7ccd47982abc99424fe 100644 (file)
 #include "includes.h"
 #include "system/network.h"
 #include <tevent.h>
-#include "../util/tevent_ntstatus.h"
-#include "../lib/tsocket/tsocket.h"
+#include "lib/util/tevent_ntstatus.h"
+#include "lib/tsocket/tsocket.h"
 #include "lib/util/util_net.h"
+#include "libcli/smb/smb_common.h"
 #include "libcli/smb/smb_direct.h"
 #include "lib/util/dlinklist.h"
+#include "lib/util/iov_buf.h"
 
 #ifdef SMB_TRANSPORT_ENABLE_RDMA
 #include <rdma/rdma_cma_abi.h>
@@ -34,6 +36,7 @@
 struct smb_direct_io;
 
 struct smb_direct_connection {
+       struct tevent_context *last_ev;
        struct {
                uint32_t max_send_size;
                uint32_t max_receive_size;
@@ -70,15 +73,28 @@ struct smb_direct_connection {
 
        TALLOC_CTX *io_mem_ctx;
        struct {
+               /*
+                * here we have io coming into
+                * the rdma layer, which needs to
+                * be flushed to the socketpair
+                */
                struct smb_direct_io *idle;
-               struct smb_direct_io *pending;
+               struct smb_direct_io *posted;
                struct smb_direct_io *ready;
-       } in;
+               struct smb_direct_io *out;
+               uint32_t remaining_length;
+       } r2s;
        struct {
+               /*
+                * here we have io coming from the socketpair
+                * which needs to be flushed into the rdma layer.
+                */
                struct smb_direct_io *idle;
-               struct smb_direct_io *pending;
+               struct smb_direct_io *posted;
                struct smb_direct_io *ready;
-       } out;
+               struct smb_direct_io *in;
+               uint32_t remaining_length;
+       } s2r;
 };
 
 #define SMB_DIRECT_IO_MAX_DATA 8192
@@ -90,8 +106,15 @@ struct smb_direct_io {
        struct ibv_mr *data_mr;
        struct ibv_sge sge[2];
 
-       struct ibv_recv_wr rwr;
-       struct ibv_send_wr swr;
+       struct ibv_recv_wr recv_wr;
+       struct ibv_send_wr send_wr;
+
+       struct iovec _iov_array[2];
+       struct iovec *iov;
+       int iov_count;
+
+       uint32_t data_length;
+       uint32_t remaining_length;
 
        uint8_t nbt_hdr[0x04];
        uint8_t smbd_hdr[0x18];
@@ -132,6 +155,23 @@ static struct smb_direct_io *smb_direct_io_create(struct smb_direct_connection *
                return NULL;
        }
 
+       io->sge[0].addr = (uint64_t) (uintptr_t) io->smbd_hdr;
+       io->sge[0].length = sizeof(io->smbd_hdr);
+       io->sge[0].lkey = io->hdr_mr->lkey;
+       io->sge[1].addr = (uint64_t) (uintptr_t) io->data;
+       io->sge[1].length = sizeof(io->data);
+       io->sge[1].lkey = io->data_mr->lkey;
+
+       io->send_wr.wr_id = (uint64_t) (uintptr_t) io;
+       io->send_wr.opcode = IBV_WR_SEND;
+       io->send_wr.send_flags = IBV_SEND_SIGNALED;
+       io->send_wr.sg_list = io->sge;
+       io->send_wr.num_sge = ARRAY_SIZE(io->sge);
+
+       io->recv_wr.wr_id = (uint64_t) (uintptr_t) io;
+       io->recv_wr.sg_list = io->sge;
+       io->recv_wr.num_sge = ARRAY_SIZE(io->sge);
+
        return io;
 }
 
@@ -281,14 +321,14 @@ struct smb_direct_connection *smb_direct_connection_create(TALLOC_CTX *mem_ctx)
                struct smb_direct_io *io;
 
                io = smb_direct_io_create(c);
-               DLIST_ADD_END(c->in.idle, io);
+               DLIST_ADD_END(c->r2s.idle, io);
        }
 
        for (i = 0; i < c->state.send_credit_target; i++) {
                struct smb_direct_io *io;
 
                io = smb_direct_io_create(c);
-               DLIST_ADD_END(c->out.idle, io);
+               DLIST_ADD_END(c->s2r.idle, io);
        }
 
        return c;
@@ -312,8 +352,8 @@ static int smb_direct_connection_destructor(struct smb_direct_connection *c)
        TALLOC_FREE(c->rdma.fde_channel);
 
        TALLOC_FREE(c->io_mem_ctx);
-       ZERO_STRUCT(c->in);
-       ZERO_STRUCT(c->out);
+       ZERO_STRUCT(c->r2s);
+       ZERO_STRUCT(c->s2r);
 
        if (c->rdma.cm_event != NULL) {
                rdma_ack_cm_event(c->rdma.cm_event);
@@ -358,10 +398,159 @@ static int smb_direct_connection_destructor(struct smb_direct_connection *c)
        return 0;
 }
 
+static int smb_direct_connection_post_recv(struct smb_direct_connection *c)
+{
+       struct smb_direct_io *io = NULL;
+       struct ibv_recv_wr *bad_recv_wr = NULL;
+       int ret;
+
+       if (c->r2s.idle == NULL) {
+               return 0;
+       }
+
+       for (io = c->r2s.idle; io != NULL; io = io->next) {
+               if (io->next == NULL) {
+                       io->recv_wr.next = NULL;
+                       break;
+               }
+
+               io->recv_wr.next = &io->next->recv_wr;
+       }
+
+       errno = 0;
+       ret = ibv_post_recv(c->ibv.qp, &c->r2s.idle->recv_wr, &bad_recv_wr);
+       if (ret != 0) {
+               NTSTATUS status;
+               status = map_nt_error_from_unix_common(errno);
+               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+               return ret;
+       }
+
+       DLIST_CONCATENATE(c->r2s.posted, c->r2s.idle);
+       c->r2s.idle = NULL;
+
+       return 0;
+}
+
+static int smb_direct_connection_post_send(struct smb_direct_connection *c)
+{
+       struct smb_direct_io *io = NULL;
+       struct smb_direct_io *next = NULL;
+       struct smb_direct_io *posted = NULL;
+       struct smb_direct_io *last = NULL;
+       struct ibv_send_wr *bad_send_wr = NULL;
+       int ret;
+
+       if (c->s2r.ready == NULL) {
+               return 0;
+       }
+
+       if (c->state.send_credits == 0) {
+               return 0;
+       }
+
+       if (1) {
+               return 0;
+       }
+
+       for (io = c->s2r.ready; io != NULL; io = next) {
+               uint16_t granted = 0;
+               uint16_t flags = 0;
+               uint32_t data_offset = 0;
+
+               next = io->next;
+
+               if (c->state.send_credits == 0) {
+                       break;
+               }
+
+               c->state.send_credits -= 1;
+
+               if (c->state.send_credits == 0) {
+                       flags |= 0x0001;
+               }
+
+               granted = c->state.receive_credit_max;
+               granted -= c->state.receive_credits;
+               granted = MIN(granted, c->state.receive_credit_target);
+               c->state.receive_credits += granted;
+
+               if (io->data_length > 0) {
+                       data_offset = 0x18;
+                       io->sge[0].length = data_offset;
+                       io->sge[1].length = io->data_length;
+                       io->send_wr.num_sge = 2;
+               } else {
+                       io->sge[0].length = 0x14;
+                       io->send_wr.num_sge = 1;
+               }
+
+               SSVAL(io->smbd_hdr, 0x00, c->state.send_credit_target);
+               SSVAL(io->smbd_hdr, 0x02, granted);
+               SSVAL(io->smbd_hdr, 0x04, flags);
+               SSVAL(io->smbd_hdr, 0x06, 0x0000);
+               SIVAL(io->smbd_hdr, 0x08, io->remaining_length);
+               SIVAL(io->smbd_hdr, 0x0C, data_offset);
+               SIVAL(io->smbd_hdr, 0x10, io->data_length);
+               SIVAL(io->smbd_hdr, 0x14, 0x00000000);
+
+               if (next == NULL) {
+                       io->send_wr.next = &next->send_wr;
+               } else {
+                       io->send_wr.next = NULL;
+               }
+               DLIST_REMOVE(c->s2r.ready, io);
+               DLIST_ADD_END(posted, io);
+       }
+
+       last = DLIST_TAIL(posted);
+       last->send_wr.next = NULL;
+
+       errno = 0;
+       ret = ibv_post_send(c->ibv.qp, &c->s2r.ready->send_wr, &bad_send_wr);
+       if (ret != 0) {
+               NTSTATUS status;
+       //      DLIST_CONCATENATE(c->s2r.ready, posted); // TODO: check bad_send_wr
+               status = map_nt_error_from_unix_common(errno);
+               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+               return ret;
+       }
+
+       DLIST_CONCATENATE(c->s2r.posted, posted);
+
+       return 0;
+}
+
+static int smb_direct_connection_setup_readv(struct smb_direct_connection *c)
+{
+       TEVENT_FD_READABLE(c->sock.fde);
+       // TODO: immediate_event?? may skips a syscall.
+       return 0;
+}
+
+static int smb_direct_connection_setup_writev(struct smb_direct_connection *c)
+{
+       TEVENT_FD_WRITEABLE(c->sock.fde);
+       // TODO: immediate_event?? may skips a syscall.
+       return 0;
+}
+
 struct smb_direct_connection_rdma_connect_state {
        struct smb_direct_connection *c;
 };
 
+static int smb_direct_connection_rdma_connect_state_destructor(
+              struct smb_direct_connection_rdma_connect_state *state)
+{
+       struct smb_direct_connection *c = state->c;
+
+       TALLOC_FREE(c->rdma.fde_channel);
+
+       return 0;
+}
+
 static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev,
                                            struct tevent_fd *fde,
                                            uint16_t flags,
@@ -392,17 +581,19 @@ static struct tevent_req *smb_direct_connection_rdma_connect_send(TALLOC_CTX *me
        }
        state->c = c;
 
-       state->c->rdma.fde_channel = tevent_add_fd(ev, state->c,
-                                       state->c->rdma.cm_channel->fd,
+       talloc_set_destructor(state, smb_direct_connection_rdma_connect_state_destructor);
+
+       c->rdma.fde_channel = tevent_add_fd(ev, c,
+                                       c->rdma.cm_channel->fd,
                                        TEVENT_FD_READ,
                                        smb_direct_connection_rdma_connect_handler,
                                        req);
-       if (tevent_req_nomem(state->c->rdma.fde_channel, req)) {
+       if (tevent_req_nomem(c->rdma.fde_channel, req)) {
                return tevent_req_post(req, ev);
        }
 
        errno = 0;
-       ret = rdma_resolve_addr(state->c->rdma.cm_id,
+       ret = rdma_resolve_addr(c->rdma.cm_id,
                                src_addr, dst_addr,
                                5000);
        if (ret != 0) {
@@ -412,7 +603,7 @@ static struct tevent_req *smb_direct_connection_rdma_connect_send(TALLOC_CTX *me
                tevent_req_nterror(req, status);
                return tevent_req_post(req, ev);
        }
-       state->c->rdma.expected_event = RDMA_CM_EVENT_ADDR_RESOLVED;
+       c->rdma.expected_event = RDMA_CM_EVENT_ADDR_RESOLVED;
 
        return req;
 }
@@ -428,6 +619,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
        struct smb_direct_connection_rdma_connect_state *state =
                tevent_req_data(req,
                struct smb_direct_connection_rdma_connect_state);
+       struct smb_direct_connection *c = state->c;
        struct rdma_conn_param conn_param;
        uint8_t ird_ord_hdr[8];
        NTSTATUS status = NT_STATUS_INTERNAL_ERROR;
@@ -435,8 +627,8 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
 
        errno = 0;
 
-       ret = rdma_get_cm_event(state->c->rdma.cm_channel,
-                               &state->c->rdma.cm_event);
+       ret = rdma_get_cm_event(c->rdma.cm_channel,
+                               &c->rdma.cm_event);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -446,8 +638,8 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
        }
 
        errno = 0;
-       if (state->c->rdma.cm_event->status != 0) {
-               errno = state->c->rdma.cm_event->status;
+       if (c->rdma.cm_event->status != 0) {
+               errno = c->rdma.cm_event->status;
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -455,16 +647,16 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                return;
        }
 
-       if (state->c->rdma.cm_event->event != state->c->rdma.expected_event) {
+       if (c->rdma.cm_event->event != c->rdma.expected_event) {
                DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
                        __location__, __FUNCTION__, ret, errno));
 
        }
 
-       switch (state->c->rdma.cm_event->event) {
+       switch (c->rdma.cm_event->event) {
        case RDMA_CM_EVENT_ADDR_RESOLVED:
        errno = 0;
-               ret = rdma_resolve_route(state->c->rdma.cm_id, 5000);
+               ret = rdma_resolve_route(c->rdma.cm_id, 5000);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -472,14 +664,14 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                        tevent_req_nterror(req, status);
                        return;
                }
-               state->c->rdma.expected_event = RDMA_CM_EVENT_ROUTE_RESOLVED;
+               c->rdma.expected_event = RDMA_CM_EVENT_ROUTE_RESOLVED;
                break;
        case RDMA_CM_EVENT_ROUTE_RESOLVED:
        errno = 0;
        ret = 0;
 #if 0
-               state->c->ibv.pd = ibv_alloc_pd(state->c->rdma.cm_id->verbs);
-               if (state->c->ibv.pd == NULL) {
+               c->ibv.pd = ibv_alloc_pd(c->rdma.cm_id->verbs);
+               if (c->ibv.pd == NULL) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -487,8 +679,8 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                        return;
                }
 
-               state->c->ibv.comp_channel = ibv_create_comp_channel(state->c->rdma.cm_id->verbs);
-               if (state->c->ibv.comp_channel == NULL) {
+               c->ibv.comp_channel = ibv_create_comp_channel(c->rdma.cm_id->verbs);
+               if (c->ibv.comp_channel == NULL) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -496,8 +688,8 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                        return;
                }
 
-               set_blocking(state->c->ibv.comp_channel->fd, false);
-               smb_set_close_on_exec(state->c->ibv.comp_channel->fd);
+               set_blocking(c->ibv.comp_channel->fd, false);
+               smb_set_close_on_exec(c->ibv.comp_channel->fd);
 
                ZERO_STRUCT(init_attr);
                init_attr.cap.max_send_wr = 2;
@@ -507,35 +699,35 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                init_attr.qp_type = IBV_QPT_RC;
                init_attr.sq_sig_all = 1;
 
-               state->c->ibv.send_cq = ibv_create_cq(state->c->rdma.cm_id->verbs,
+               c->ibv.send_cq = ibv_create_cq(c->rdma.cm_id->verbs,
                                                      init_attr.cap.max_send_wr,
-                                                     state->c,
-                                                     state->c->ibv.comp_channel,
+                                                     c,
+                                                     c->ibv.comp_channel,
                                                      0);
-               if (state->c->ibv.send_cq == NULL) {
+               if (c->ibv.send_cq == NULL) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               init_attr.send_cq = state->c->ibv.send_cq;
-               state->c->ibv.recv_cq = ibv_create_cq(state->c->rdma.cm_id->verbs,
+               init_attr.send_cq = c->ibv.send_cq;
+               c->ibv.recv_cq = ibv_create_cq(c->rdma.cm_id->verbs,
                                                      init_attr.cap.max_recv_wr,
-                                                     state->c,
-                                                     state->c->ibv.comp_channel,
+                                                     c,
+                                                     c->ibv.comp_channel,
                                                      0);
-               if (state->c->ibv.recv_cq == NULL) {
+               if (c->ibv.recv_cq == NULL) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               init_attr.recv_cq = state->c->ibv.recv_cq;
+               init_attr.recv_cq = c->ibv.recv_cq;
 
                errno = 0;
-               ret = ibv_req_notify_cq(state->c->ibv.send_cq, 0);
+               ret = ibv_req_notify_cq(c->ibv.send_cq, 0);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -545,7 +737,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                }
 
                errno = 0;
-               ret = ibv_req_notify_cq(state->c->ibv.recv_cq, 0);
+               ret = ibv_req_notify_cq(c->ibv.recv_cq, 0);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -555,7 +747,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                }
 
                errno = 0;
-               ret = rdma_create_qp(state->c->rdma.cm_id, state->c->ibv.pd,
+               ret = rdma_create_qp(c->rdma.cm_id, c->ibv.pd,
                                     &init_attr);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
@@ -564,7 +756,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                        tevent_req_nterror(req, status);
                        return;
                }
-               state->c->ibv.qp = state->c->rdma.cm_id->qp;
+               c->ibv.qp = c->rdma.cm_id->qp;
 #endif
                RSIVAL(ird_ord_hdr, 0, 16);
                RSIVAL(ird_ord_hdr, 4, 0);
@@ -577,7 +769,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                conn_param.retry_count = 10;
 
                errno = 0;
-               ret = rdma_connect(state->c->rdma.cm_id, &conn_param);
+               ret = rdma_connect(c->rdma.cm_id, &conn_param);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -585,7 +777,7 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                        tevent_req_nterror(req, status);
                        return;
                }
-               state->c->rdma.expected_event = RDMA_CM_EVENT_ESTABLISHED;
+               c->rdma.expected_event = RDMA_CM_EVENT_ESTABLISHED;
                break;
 
        case RDMA_CM_EVENT_ESTABLISHED:
@@ -594,10 +786,10 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
                        __location__, __FUNCTION__, ret, errno));
 
-               state->c->rdma.expected_event = RDMA_CM_EVENT_DISCONNECTED;
-               TALLOC_FREE(state->c->rdma.fde_channel);
-               rdma_ack_cm_event(state->c->rdma.cm_event);
-               state->c->rdma.cm_event = NULL;
+               c->rdma.expected_event = RDMA_CM_EVENT_DISCONNECTED;
+               TALLOC_FREE(c->rdma.fde_channel);
+               rdma_ack_cm_event(c->rdma.cm_event);
+               c->rdma.cm_event = NULL;
                tevent_req_done(req);
                return;
 
@@ -617,36 +809,23 @@ static void smb_direct_connection_rdma_connect_handler(struct tevent_context *ev
                status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                DEBUG(0,("%s:%s: event[%d] ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__,
-                       state->c->rdma.cm_event->event, ret, errno, nt_errstr(status)));
+                       c->rdma.cm_event->event, ret, errno, nt_errstr(status)));
                tevent_req_nterror(req, status);
                return;
        }
 
-       rdma_ack_cm_event(state->c->rdma.cm_event);
-       state->c->rdma.cm_event = NULL;
+       rdma_ack_cm_event(c->rdma.cm_event);
+       c->rdma.cm_event = NULL;
 }
 
 static NTSTATUS smb_direct_connection_rdma_connect_recv(struct tevent_req *req)
 {
-       struct smb_direct_connection_rdma_connect_state *state =
-               tevent_req_data(req,
-               struct smb_direct_connection_rdma_connect_state);
-       NTSTATUS status;
-
-       // TODO: _cleanup_fn
-       TALLOC_FREE(state->c->rdma.fde_channel);
-
-       if (tevent_req_is_nterror(req, &status)) {
-               tevent_req_received(req);
-               return status;
-       }
-
-       tevent_req_received(req);
-       return NT_STATUS_OK;
+       return tevent_req_simple_recv_ntstatus(req);
 }
 
 struct smb_direct_connection_negotiate_connect_state {
        struct smb_direct_connection *c;
+#if 0
        struct {
                struct ibv_sge sge[1];
                struct ibv_send_wr wr;
@@ -663,18 +842,21 @@ struct smb_direct_connection_negotiate_connect_state {
                struct ibv_sge sge[1];
                struct ibv_recv_wr wr;
        } rep;
+#endif
 };
 
 static int smb_direct_connection_negotiate_connect_destructor(
               struct smb_direct_connection_negotiate_connect_state *state)
 {
-       TALLOC_FREE(state->c->ibv.fde_channel);
-       TALLOC_FREE(state->c->rdma.fde_channel);
+       struct smb_direct_connection *c = state->c;
 
-       if (state->req.mr != NULL) {
-               ibv_dereg_mr(state->req.mr);
-               state->req.mr = NULL;
-       }
+       TALLOC_FREE(c->ibv.fde_channel);
+       TALLOC_FREE(c->rdma.fde_channel);
+
+//     if (state->req.mr != NULL) {
+//             ibv_dereg_mr(state->req.mr);
+//             state->req.mr = NULL;
+//     }
 
        return 0;
 }
@@ -694,6 +876,9 @@ static struct tevent_req *smb_direct_connection_negotiate_connect_send(TALLOC_CT
 {
        struct tevent_req *req;
        struct smb_direct_connection_negotiate_connect_state *state;
+       struct smb_direct_io *rdma_read = NULL;
+       struct smb_direct_io *neg_send = NULL;
+       struct smb_direct_io *neg_recv = NULL;
        struct ibv_recv_wr *bad_recv_wr = NULL;
        struct ibv_send_wr *bad_send_wr = NULL;
        NTSTATUS status;
@@ -706,76 +891,69 @@ static struct tevent_req *smb_direct_connection_negotiate_connect_send(TALLOC_CT
        }
        state->c = c;
 
+       // TODO: cleanup
        talloc_set_destructor(state, smb_direct_connection_negotiate_connect_destructor);
 
-       state->c->ibv.fde_channel = tevent_add_fd(ev, state->c,
-                                               state->c->ibv.comp_channel->fd,
-                                               TEVENT_FD_READ,
-                                               smb_direct_connection_negotiate_connect_ibv_handler,
-                                               req);
-       if (tevent_req_nomem(state->c->ibv.fde_channel, req)) {
+       c->rdma.fde_channel = tevent_add_fd(ev, c, c->rdma.cm_channel->fd,
+                                           TEVENT_FD_READ,
+                                           smb_direct_connection_negotiate_connect_rdma_handler,
+                                           req);
+       if (tevent_req_nomem(c->rdma.fde_channel, req)) {
                return tevent_req_post(req, ev);
        }
-       state->c->rdma.fde_channel = tevent_add_fd(ev, state->c,
-                                               state->c->rdma.cm_channel->fd,
-                                               TEVENT_FD_READ,
-                                               smb_direct_connection_negotiate_connect_rdma_handler,
-                                               req);
-       if (tevent_req_nomem(state->c->rdma.fde_channel, req)) {
+       c->ibv.fde_channel = tevent_add_fd(ev, c, c->ibv.comp_channel->fd,
+                                          TEVENT_FD_READ,
+                                          smb_direct_connection_negotiate_connect_ibv_handler,
+                                          req);
+       if (tevent_req_nomem(c->ibv.fde_channel, req)) {
                return tevent_req_post(req, ev);
        }
 
-       state->rdma_read.sge[0].addr = 1;
-       state->rdma_read.sge[0].length = 0;
-       state->rdma_read.sge[0].lkey = 1;
-       state->rdma_read.wr.opcode = IBV_WR_RDMA_READ;
-       state->rdma_read.wr.send_flags = IBV_SEND_SIGNALED;
-       state->rdma_read.wr.sg_list = state->rdma_read.sge;
-       state->rdma_read.wr.num_sge = ARRAY_SIZE(state->rdma_read.sge);
-       state->rdma_read.wr.wr.rdma.rkey = 1;
-       state->rdma_read.wr.wr.rdma.remote_addr = 1;
-
-       SSVAL(state->req.buffer, 0x00, 0x0100);
-       SSVAL(state->req.buffer, 0x02, 0x0100);
-       SSVAL(state->req.buffer, 0x04, 0x0000);
-       SSVAL(state->req.buffer, 0x06, state->c->state.send_credit_target);
-       SIVAL(state->req.buffer, 0x08, state->c->state.max_send_size);
-       SIVAL(state->req.buffer, 0x0C, state->c->state.max_receive_size);
-       SIVAL(state->req.buffer, 0x10, state->c->state.max_fragmented_size);
-
-       state->req.mr = ibv_reg_mr(state->c->ibv.pd,
-                                  state->req.buffer,
-                                  0x14,//sizeof(state->req.buffer),
-                                  0);
-       if (tevent_req_nomem(state->req.mr, req)) {
+       neg_recv = smb_direct_io_create(c);
+       if (tevent_req_nomem(neg_recv, req)) {
                return tevent_req_post(req, ev);
        }
+       neg_recv->sge[0].addr = (uint64_t) (uintptr_t) neg_recv->data;
+       neg_recv->sge[0].length = sizeof(neg_recv->data);
+       neg_recv->sge[0].lkey = neg_recv->data_mr->lkey;
+       neg_recv->recv_wr.sg_list = neg_recv->sge;
+       neg_recv->recv_wr.num_sge = 1;
 
-       state->req.sge[0].addr = (uint64_t) (uintptr_t) state->req.buffer;
-       state->req.sge[0].length = 0x14;//sizeof(state->req.buffer);
-       state->req.sge[0].lkey = state->req.mr->lkey;
-       state->req.wr.opcode = IBV_WR_SEND;
-       state->req.wr.send_flags = IBV_SEND_SIGNALED;
-       state->req.wr.sg_list = state->req.sge;
-       state->req.wr.num_sge = ARRAY_SIZE(state->req.sge);
-
-       state->rep.mr = ibv_reg_mr(state->c->ibv.pd,
-                                  state->rep.buffer,
-                                  sizeof(state->rep.buffer),
-                                  IBV_ACCESS_LOCAL_WRITE);
-       if (tevent_req_nomem(state->rep.mr, req)) {
+       rdma_read = smb_direct_io_create(c);
+       if (tevent_req_nomem(rdma_read, req)) {
                return tevent_req_post(req, ev);
        }
-
-       memset(state->rep.buffer, 0x1F, sizeof(state->rep.buffer));
-       state->rep.sge[0].addr = (uint64_t) (uintptr_t) state->rep.buffer;
-       state->rep.sge[0].length = sizeof(state->rep.buffer);
-       state->rep.sge[0].lkey = state->rep.mr->lkey;
-       state->rep.wr.sg_list = state->rep.sge;
-       state->rep.wr.num_sge = ARRAY_SIZE(state->rep.sge);
+       rdma_read->sge[0].addr = 1;
+       rdma_read->sge[0].length = 0;
+       rdma_read->sge[0].lkey = 1;
+       rdma_read->send_wr.opcode = IBV_WR_RDMA_READ;
+       rdma_read->send_wr.send_flags = IBV_SEND_SIGNALED;
+       rdma_read->send_wr.sg_list = rdma_read->sge;
+       rdma_read->send_wr.num_sge = 1;
+       rdma_read->send_wr.wr.rdma.rkey = 1;
+       rdma_read->send_wr.wr.rdma.remote_addr = 1;
+
+       neg_send = smb_direct_io_create(c);
+       if (tevent_req_nomem(neg_send, req)) {
+               return tevent_req_post(req, ev);
+       }
+       SSVAL(neg_send->data, 0x00, 0x0100);
+       SSVAL(neg_send->data, 0x02, 0x0100);
+       SSVAL(neg_send->data, 0x04, 0x0000);
+       SSVAL(neg_send->data, 0x06, c->state.send_credit_target);
+       SIVAL(neg_send->data, 0x08, c->state.max_send_size);
+       SIVAL(neg_send->data, 0x0C, c->state.max_receive_size);
+       SIVAL(neg_send->data, 0x10, c->state.max_fragmented_size);
+       neg_send->sge[0].addr = (uint64_t) (uintptr_t) neg_send->data;
+       neg_send->sge[0].length = 0x14;
+       neg_send->sge[0].lkey = neg_send->data_mr->lkey;
+       neg_send->send_wr.opcode = IBV_WR_SEND;
+       neg_send->send_wr.send_flags = IBV_SEND_SIGNALED;
+       neg_send->send_wr.sg_list = neg_send->sge;
+       neg_send->send_wr.num_sge = 1;
 
        errno = 0;
-       ret = ibv_post_recv(state->c->ibv.qp, &state->rep.wr, &bad_recv_wr);
+       ret = ibv_post_recv(c->ibv.qp, &neg_recv->recv_wr, &bad_recv_wr);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -784,9 +962,9 @@ static struct tevent_req *smb_direct_connection_negotiate_connect_send(TALLOC_CT
                return tevent_req_post(req, ev);
        }
 
-       state->rdma_read.wr.next = &state->req.wr;
+       rdma_read->send_wr.next = &neg_send->send_wr;
        errno = 0;
-       ret = ibv_post_send(state->c->ibv.qp, &state->rdma_read.wr, &bad_send_wr);
+       ret = ibv_post_send(c->ibv.qp, &rdma_read->send_wr, &bad_send_wr);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -811,13 +989,14 @@ static void smb_direct_connection_negotiate_connect_rdma_handler(struct tevent_c
        struct smb_direct_connection_negotiate_connect_state *state =
                tevent_req_data(req,
                struct smb_direct_connection_negotiate_connect_state);
+       struct smb_direct_connection *c = state->c;
        NTSTATUS status = NT_STATUS_INTERNAL_ERROR;
        int ret;
 
        errno = 0;
 
-       ret = rdma_get_cm_event(state->c->rdma.cm_channel,
-                               &state->c->rdma.cm_event);
+       ret = rdma_get_cm_event(c->rdma.cm_channel,
+                               &c->rdma.cm_event);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -826,8 +1005,8 @@ static void smb_direct_connection_negotiate_connect_rdma_handler(struct tevent_c
                return;
        }
 
-       if (state->c->rdma.cm_event->status != 0) {
-               errno = state->c->rdma.cm_event->status;
+       if (c->rdma.cm_event->status != 0) {
+               errno = c->rdma.cm_event->status;
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -835,7 +1014,7 @@ static void smb_direct_connection_negotiate_connect_rdma_handler(struct tevent_c
                return;
        }
 
-       switch (state->c->rdma.cm_event->event) {
+       switch (c->rdma.cm_event->event) {
        case RDMA_CM_EVENT_DISCONNECTED:
                status = NT_STATUS_CONNECTION_DISCONNECTED;
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -881,6 +1060,7 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
        struct smb_direct_connection_negotiate_connect_state *state =
                tevent_req_data(req,
                struct smb_direct_connection_negotiate_connect_state);
+       struct smb_direct_connection *c = state->c;
        struct ibv_cq *cq = NULL;
        void *cq_context = NULL;
        NTSTATUS status = NT_STATUS_INTERNAL_ERROR;
@@ -893,11 +1073,10 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
        uint32_t max_receive_size;
        uint32_t max_fragmented_size;
        uint32_t tmp;
-       uint8_t *ptr;
-       struct ibv_recv_wr *bad_recv_wr = NULL;
+       struct smb_direct_io *io = NULL;
 
        errno = 0;
-       ret = ibv_get_cq_event(state->c->ibv.comp_channel,
+       ret = ibv_get_cq_event(c->ibv.comp_channel,
                               &cq, &cq_context);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
@@ -909,7 +1088,7 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
 
        ibv_ack_cq_events(cq, 1);
 
-       if (cq_context != state->c) {
+       if (cq_context != c) {
                status = NT_STATUS_INTERNAL_ERROR;;
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -944,8 +1123,8 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                status = map_nt_error_from_unix_common(wc.status);//errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
-               TALLOC_FREE(state->c->ibv.fde_channel);
-               TALLOC_FREE(state->c->rdma.fde_channel);
+               TALLOC_FREE(c->ibv.fde_channel);
+               TALLOC_FREE(c->rdma.fde_channel);
                smb_direct_connection_negotiate_connect_rdma_handler(ev, fde, flags, private_data);
                return;
        }
@@ -959,19 +1138,24 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                return;
        }
 
+       io = talloc_get_type_abort((void *)(uintptr_t)wc.wr_id,
+                                  struct smb_direct_io);
+
        switch (wc.opcode) {
        case IBV_WC_SEND:
-               DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
-                       __location__, __FUNCTION__, ret, errno));
+               DEBUG(0,("%s:%s: GOT SEND[%p] ret[%d] errno[%d]\n",
+                       __location__, __FUNCTION__, io, ret, errno));
+               TALLOC_FREE(io);
                break;
        case IBV_WC_RDMA_READ:
-               DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
-                       __location__, __FUNCTION__, ret, errno));
+               DEBUG(0,("%s:%s: GOT RDMA_READ[%p] ret[%d] errno[%d]\n",
+                       __location__, __FUNCTION__, io, ret, errno));
+               TALLOC_FREE(io);
                break;
        case IBV_WC_RECV:
-               DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
-                       __location__, __FUNCTION__, ret, errno));
-               dump_data(0, state->rep.buffer, wc.byte_len);
+               DEBUG(0,("%s:%s: GOT RECV[%p] ret[%d] errno[%d]\n",
+                       __location__, __FUNCTION__, io, ret, errno));
+               dump_data(0, io->data, wc.byte_len);
                if (wc.byte_len < 0x20) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -979,28 +1163,28 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                        tevent_req_nterror(req, status);
                        return;
                }
-               if (SVAL(state->rep.buffer, 0x00) != 0x0100) {
+               if (SVAL(io->data, 0x00) != 0x0100) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               if (SVAL(state->rep.buffer, 0x02) != 0x0100) {
+               if (SVAL(io->data, 0x02) != 0x0100) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               if (SVAL(state->rep.buffer, 0x04) != 0x0100) {
+               if (SVAL(io->data, 0x04) != 0x0100) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               credits_requested = SVAL(state->rep.buffer, 0x08);
+               credits_requested = SVAL(io->data, 0x08);
                if (credits_requested == 0) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1008,7 +1192,7 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                        tevent_req_nterror(req, status);
                        return;
                }
-               credits_granted = SVAL(state->rep.buffer, 0x0A);
+               credits_granted = SVAL(io->data, 0x0A);
                if (credits_granted == 0) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1016,23 +1200,23 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                        tevent_req_nterror(req, status);
                        return;
                }
-               status = NT_STATUS(IVAL(state->rep.buffer, 0x0C));
+               status = NT_STATUS(IVAL(io->data, 0x0C));
                if (!NT_STATUS_IS_OK(status)) {
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               max_read_write_size = IVAL(state->rep.buffer, 0x10);
-               preferred_send_size = IVAL(state->rep.buffer, 0x14);
-               if (preferred_send_size > state->c->state.max_receive_size) {
+               max_read_write_size = IVAL(io->data, 0x10);
+               preferred_send_size = IVAL(io->data, 0x14);
+               if (preferred_send_size > c->state.max_receive_size) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        tevent_req_nterror(req, status);
                        return;
                }
-               max_receive_size = IVAL(state->rep.buffer, 0x18);
+               max_receive_size = IVAL(io->data, 0x18);
                if (max_receive_size < 0x80) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1040,7 +1224,7 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                        tevent_req_nterror(req, status);
                        return;
                }
-               max_fragmented_size = IVAL(state->rep.buffer, 0x1C);
+               max_fragmented_size = IVAL(io->data, 0x1C);
                if (max_fragmented_size < 0x20000) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1049,66 +1233,36 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
                        return;
                }
 
-               state->c->state.receive_credit_target = credits_requested;
+               c->state.receive_credit_target = credits_requested;
 
-               tmp = state->c->state.max_receive_size;
+               tmp = c->state.max_receive_size;
                tmp = MIN(tmp, preferred_send_size);
                tmp = MAX(tmp, 128);
-               state->c->state.max_receive_size = tmp;
+               c->state.max_receive_size = tmp;
 
-               tmp = state->c->state.max_send_size;
+               tmp = c->state.max_send_size;
                tmp = MIN(tmp, max_receive_size);
-               state->c->state.max_send_size = tmp;
+               c->state.max_send_size = tmp;
 
                tmp = MIN(1048576, max_read_write_size);
-               state->c->state.max_read_write_size = tmp;
+               c->state.max_read_write_size = tmp;
 
-               tmp = state->c->state.max_fragmented_size;
+               tmp = c->state.max_fragmented_size;
                tmp = MIN(tmp, max_fragmented_size);
-               state->c->state.max_fragmented_size = tmp;
+               c->state.max_fragmented_size = tmp;
 
-               state->c->state.send_credits = credits_granted;
+               c->state.send_credits = credits_granted;
 
-               TALLOC_FREE(state->c->ibv.fde_channel);
-               TALLOC_FREE(state->c->rdma.fde_channel);
+               TALLOC_FREE(c->ibv.fde_channel);
+               TALLOC_FREE(c->rdma.fde_channel);
 
                DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
                        __location__, __FUNCTION__, ret, errno));
 
-               state->c->inbuf.hdr_mr = ibv_reg_mr(state->c->ibv.pd,
-                                                   state->c->inbuf.hdr,
-                                                   sizeof(state->c->inbuf.hdr),
-                                                   IBV_ACCESS_LOCAL_WRITE);
-               if (tevent_req_nomem(state->c->inbuf.hdr_mr, req)) {
-                       return;
-               }
-
-               ptr = talloc_array(state->c, uint8_t,
-                                  state->c->state.max_receive_size - 0x14);
-               if (tevent_req_nomem(ptr, req)) {
-                       return;
-               }
-               state->c->inbuf.reassembly_buffer = ptr;
-
-               state->c->inbuf.cur_mr = ibv_reg_mr(state->c->ibv.pd,
-                                                   ptr,
-                                                   talloc_get_size(ptr),
-                                                   IBV_ACCESS_LOCAL_WRITE);
-               if (tevent_req_nomem(state->c->inbuf.cur_mr, req)) {
-                       return;
-               }
-
-               state->c->inbuf.sge[0].addr = (uint64_t) (uintptr_t) state->c->inbuf.hdr;
-               state->c->inbuf.sge[0].length = sizeof(state->c->inbuf.hdr);
-               state->c->inbuf.sge[0].lkey = state->c->inbuf.hdr_mr->lkey;
-               state->c->inbuf.sge[1].addr = (uint64_t) (uintptr_t) (ptr + 4);
-               state->c->inbuf.sge[1].length = talloc_get_size(ptr) - 4;
-               state->c->inbuf.sge[1].lkey = state->c->inbuf.cur_mr->lkey;
-               state->c->inbuf.wr.sg_list = state->c->inbuf.sge;
-               state->c->inbuf.wr.num_sge = ARRAY_SIZE(state->c->inbuf.sge);
+               TALLOC_FREE(io);
 
                errno = 0;
-               ret = ibv_post_recv(state->c->ibv.qp, &state->c->inbuf.wr, &bad_recv_wr);
+               ret = smb_direct_connection_post_recv(c);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1132,13 +1286,104 @@ static void smb_direct_connection_negotiate_connect_ibv_handler(struct tevent_co
 
 static NTSTATUS smb_direct_connection_negotiate_connect_recv(struct tevent_req *req)
 {
+       return tevent_req_simple_recv_ntstatus(req);
+}
+
+struct smb_direct_connection_connect_state {
+       struct tevent_context *ev;
+       struct smb_direct_connection *c;
+};
+
+static void smb_direct_connection_connect_done_rdma(struct tevent_req *subreq);
+static void smb_direct_connection_connect_done_negotiate(struct tevent_req *subreq);
+
+struct tevent_req *smb_direct_connection_connect_send(TALLOC_CTX *mem_ctx,
+                                                     struct tevent_context *ev,
+                                                     struct smb_direct_connection *c,
+                                                     const struct sockaddr_storage *src,
+                                                     const struct sockaddr_storage *dst)
+{
+       struct tevent_req *req = NULL;
+       struct smb_direct_connection_connect_state *state = NULL;
+       struct tevent_req *subreq = NULL;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct smb_direct_connection_connect_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->c = c;
+
+       subreq = smb_direct_connection_rdma_connect_send(state, ev, c, src, dst, NULL, NULL);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq,
+                               smb_direct_connection_connect_done_rdma,
+                               req);
+
+       return req;
+}
+
+static void smb_direct_connection_connect_done_rdma(struct tevent_req *subreq)
+{
+       struct tevent_req *req =
+               tevent_req_callback_data(subreq,
+               struct tevent_req);
+       struct smb_direct_connection_connect_state *state =
+               tevent_req_data(req,
+               struct smb_direct_connection_connect_state);
+       NTSTATUS status;
+
+       status = smb_direct_connection_rdma_connect_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
+
+       subreq = smb_direct_connection_negotiate_connect_send(state, state->ev, state->c);
+       if (tevent_req_nomem(subreq, req)) {
+               return;
+       }
+       tevent_req_set_callback(subreq,
+                               smb_direct_connection_connect_done_negotiate,
+                               req);
+}
+
+static void smb_direct_connection_connect_done_negotiate(struct tevent_req *subreq)
+{
+       struct tevent_req *req =
+               tevent_req_callback_data(subreq,
+               struct tevent_req);
        NTSTATUS status;
 
+       status = smb_direct_connection_negotiate_connect_recv(subreq);
+       TALLOC_FREE(subreq);
+       if (tevent_req_nterror(req, status)) {
+               return;
+       }
+
+       tevent_req_done(req);
+}
+
+NTSTATUS smb_direct_connection_connect_recv(struct tevent_req *req, int *fd)
+{
+       struct smb_direct_connection_connect_state *state =
+               tevent_req_data(req,
+               struct smb_direct_connection_connect_state);
+       struct smb_direct_connection *c = state->c;
+       NTSTATUS status;
+
+       *fd = -1;
+
        if (tevent_req_is_nterror(req, &status)) {
                tevent_req_received(req);
                return status;
        }
 
+       *fd = c->sock.tmp_fd;
+       c->sock.tmp_fd = -1;
        tevent_req_received(req);
        return NT_STATUS_OK;
 }
@@ -1150,27 +1395,13 @@ static void smb_direct_connection_disconnect(struct smb_direct_connection *c,
                status = NT_STATUS_UNEXPECTED_NETWORK_ERROR;
        }
 
-       if (c->reqs.read_pdu_req != NULL) {
-               tevent_req_defer_callback(c->reqs.read_pdu_req,
-                                         c->reqs.last_ev);
-               tevent_req_nterror(c->reqs.read_pdu_req, status);
-               c->reqs.read_pdu_req = NULL;
-       }
-
-       if (c->reqs.write_pdu_req != NULL) {
-               tevent_req_defer_callback(c->reqs.write_pdu_req,
-                                         c->reqs.last_ev);
-               tevent_req_nterror(c->reqs.write_pdu_req, status);
-               c->reqs.write_pdu_req = NULL;
-       }
-
        smb_direct_connection_destructor(c);
 }
 
 static void smb_direct_connection_rdma_handler(struct tevent_context *ev,
-                                                  struct tevent_fd *fde,
-                                                  uint16_t flags,
-                                                  void *private_data)
+                                              struct tevent_fd *fde,
+                                              uint16_t flags,
+                                              void *private_data)
 {
        struct smb_direct_connection *c =
                talloc_get_type_abort(private_data,
@@ -1235,9 +1466,9 @@ static void smb_direct_connection_rdma_handler(struct tevent_context *ev,
 }
 
 static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
-                                                 struct tevent_fd *fde,
-                                                 uint16_t fde_flags,
-                                                 void *private_data)
+                                             struct tevent_fd *fde,
+                                             uint16_t fde_flags,
+                                             void *private_data)
 {
        struct smb_direct_connection *c =
                talloc_get_type_abort(private_data,
@@ -1247,14 +1478,11 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
        NTSTATUS status = NT_STATUS_INTERNAL_ERROR;
        struct ibv_wc wc;
        int ret;
-       uint8_t *ptr;
        uint16_t credits_requested;
        uint16_t credits_granted;
        uint16_t flags;
-       uint32_t remaining_length;
        uint32_t data_offset;
-       uint32_t data_length;
-       struct ibv_recv_wr *bad_recv_wr = NULL;
+       struct smb_direct_io *io = NULL;
 
        errno = 0;
        ret = ibv_get_cq_event(c->ibv.comp_channel,
@@ -1320,23 +1548,37 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                return;
        }
 
+       io = talloc_get_type_abort((void *)(uintptr_t)wc.wr_id,
+                                  struct smb_direct_io);
+
        switch (wc.opcode) {
        case IBV_WC_SEND:
-               DEBUG(0,("%s:%s: ret[%d] errno[%d]\n",
-                       __location__, __FUNCTION__, ret, errno));
-               if (c->reqs.write_pdu_req == NULL) {
-                       status = NT_STATUS_INTERNAL_ERROR;
+               DEBUG(0,("%s:%s: GOT SEND[%p] ret[%d] errno[%d]\n",
+                       __location__, __FUNCTION__, io, ret, errno));
+               DLIST_REMOVE(c->s2r.posted, io);
+               DLIST_ADD_END(c->s2r.idle, io);
+
+               errno = 0;
+               ret = smb_direct_connection_setup_readv(c);
+               if (ret != 0) {
+                       status = map_nt_error_from_unix_common(errno);
+                       DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                               __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-               tevent_req_defer_callback(c->reqs.write_pdu_req,
-                                         c->reqs.last_ev);
-               tevent_req_done(c->reqs.write_pdu_req);
                return;
 
        case IBV_WC_RECV:
-               DEBUG(0,("%s:%s: GOT WC_RECV ret[%d] errno[%d]\n",
-                       __location__, __FUNCTION__, ret, errno));
+               DEBUG(0,("%s:%s: GOT RECV[%p] ret[%d] errno[%d]\n",
+                       __location__, __FUNCTION__, io, ret, errno));
+               if (wc.byte_len >= c->state.max_receive_size) {
+                       status = NT_STATUS_INVALID_NETWORK_RESPONSE;
+                       DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                               __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                       smb_direct_connection_disconnect(c, status);
+                       return;
+               }
                if (wc.byte_len < 0x14) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1344,8 +1586,9 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-               dump_data(0, c->inbuf.hdr, MIN(wc.byte_len, sizeof(c->inbuf.hdr)));
-               credits_requested = SVAL(c->inbuf.hdr, 0x00);
+               DLIST_REMOVE(c->r2s.posted, io);
+               dump_data(0, io->smbd_hdr, MIN(wc.byte_len, sizeof(io->smbd_hdr)));
+               credits_requested = SVAL(io->smbd_hdr, 0x00);
                if (credits_requested == 0) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1353,7 +1596,7 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-               credits_granted = SVAL(c->inbuf.hdr, 0x02);
+               credits_granted = SVAL(io->smbd_hdr, 0x02);
                if (credits_granted == 0) {
                        status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1361,10 +1604,10 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-               flags = SVAL(c->inbuf.hdr, 0x04);
-               remaining_length = IVAL(c->inbuf.hdr, 0x08);
-               data_offset = IVAL(c->inbuf.hdr, 0x0C);
-               data_length = IVAL(c->inbuf.hdr, 0x10);
+               flags = SVAL(io->smbd_hdr, 0x04);
+               io->remaining_length = IVAL(io->smbd_hdr, 0x08);
+               data_offset = IVAL(io->smbd_hdr, 0x0C);
+               io->data_length = IVAL(io->smbd_hdr, 0x10);
 
                c->state.receive_credits -= 1;
                c->state.receive_credit_target = credits_requested;
@@ -1378,9 +1621,19 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                                smb_direct_connection_disconnect(c, status);
                                return;
                        }
-                       goto repost_receive;
+                       DLIST_ADD_END(c->r2s.idle, io);
+                       errno = 0;
+                       ret = smb_direct_connection_post_recv(c);
+                       if (ret != 0) {
+                               status = map_nt_error_from_unix_common(errno);
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
+                       return;
                } else if (data_offset == 0x18) {
-                       if (data_length >= (c->state.max_receive_size - data_offset)) {
+                       if (io->data_length >= (c->state.max_receive_size - data_offset)) {
                                status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
@@ -1395,75 +1648,78 @@ static void smb_direct_connection_ibv_handler(struct tevent_context *ev,
                        return;
                }
 
-               if (remaining_length > 0) {
-                       // TODO: ...
-               }
-
-               if (flags) {
-                       // TODO: ...
-               }
+               if (c->r2s.remaining_length > 0) {
+                       if (io->data_length > c->r2s.remaining_length) {
+                               status = NT_STATUS_INVALID_NETWORK_RESPONSE;
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
 
-               if (c->reqs.read_pdu_req == NULL) {
-                       // TODO: is this correct???
-                       goto repost_receive;
-               }
+                       c->r2s.remaining_length -= io->data_length;
 
-               if (c->reqs.read_pdu_req != NULL) {
-                       TALLOC_FREE(c->inbuf.full_buffer);
-                       if (c->inbuf.cur_mr != NULL) {
-                               ibv_dereg_mr(c->inbuf.cur_mr);
-                               c->inbuf.cur_mr = NULL;
+                       if (io->remaining_length != c->r2s.remaining_length) {
+                               status = NT_STATUS_INVALID_NETWORK_RESPONSE;
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
                        }
-                       ptr = c->inbuf.reassembly_buffer;
-                       ptr = talloc_realloc(c, ptr, uint8_t, 4 + data_length);
-                       if (ptr == NULL) {
-                               status = NT_STATUS_NO_MEMORY;
+
+                       io->iov = io->_iov_array;
+                       io->iov[0].iov_base = io->data;
+                       io->iov[0].iov_len = io->data_length;
+                       io->iov_count = 1;
+               } else {
+                       uint64_t total_length = io->data_length + io->remaining_length;
+
+                       if (total_length >= c->state.max_fragmented_size) { //correct direction
+                               status = NT_STATUS_INVALID_NETWORK_RESPONSE;
                                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                                smb_direct_connection_disconnect(c, status);
                                return;
                        }
 
-                       c->inbuf.full_buffer = ptr;
-                       c->inbuf.reassembly_buffer= NULL;
-                       tevent_req_defer_callback(c->reqs.read_pdu_req,
-                                                 c->reqs.last_ev);
-                       tevent_req_done(c->reqs.read_pdu_req);
-                       /* no return here */
+                       _smb_setlen_tcp(io->nbt_hdr, total_length);
+                       io->iov = io->_iov_array;
+                       io->iov[0].iov_base = io->nbt_hdr;
+                       io->iov[0].iov_len = sizeof(io->nbt_hdr);
+                       io->iov[1].iov_base = io->data;
+                       io->iov[1].iov_len = io->data_length;
+                       io->iov_count = 2;
                }
 
-               ptr = talloc_array(c, uint8_t,
-                                  c->state.max_receive_size - 0x14);
-               if (ptr == NULL) {
-                       status = NT_STATUS_NO_MEMORY;
+               if (c->state.receive_credits == 0) {
+                       // TODO: send more credits
+                       status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-               c->inbuf.reassembly_buffer = ptr;
-
-               c->inbuf.cur_mr = ibv_reg_mr(c->ibv.pd,
-                                                   ptr,
-                                                   talloc_get_size(ptr),
-                                                   IBV_ACCESS_LOCAL_WRITE);
-               if (c->inbuf.cur_mr == NULL) {
-                       status = NT_STATUS_NO_MEMORY;
+
+               if (flags & ~0x0001) {
+                       status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                                __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
 
-               c->inbuf.sge[1].addr = (uint64_t) (uintptr_t) (ptr + 4);
-               c->inbuf.sge[1].length = talloc_get_size(ptr) - 4;
-               c->inbuf.sge[1].lkey = c->inbuf.cur_mr->lkey;
+               if (flags & 0x0001) {
+                       // TODO: send more credits
+                       status = map_nt_error_from_unix_common(errno);
+                       DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                               __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                       smb_direct_connection_disconnect(c, status);
+                       return;
+               }
 
-repost_receive:
+               DLIST_ADD_END(c->r2s.ready, io);
                errno = 0;
-                       DEBUG(0,("%s:%s: REPOST_RECV...\n",
-                               __location__, __FUNCTION__));
-               ret = ibv_post_recv(c->ibv.qp, &c->inbuf.wr, &bad_recv_wr);
+               ret = smb_direct_connection_setup_writev(c);
                if (ret != 0) {
                        status = map_nt_error_from_unix_common(errno);
                        DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
@@ -1471,7 +1727,6 @@ repost_receive:
                        smb_direct_connection_disconnect(c, status);
                        return;
                }
-
                return;
 
        case IBV_WC_RDMA_READ:
@@ -1485,335 +1740,263 @@ repost_receive:
        }
 }
 
-static NTSTATUS smb_direct_connection_setup_events(struct smb_direct_connection *c,
-                                                      struct tevent_context *ev)
-{
-       if (tevent_fd_get_flags(c->ibv.fde_channel) == 0) {
-               c->reqs.last_ev = NULL;
-               TALLOC_FREE(c->ibv.fde_channel);
-               TALLOC_FREE(c->rdma.fde_channel);
-       }
-
-       if (tevent_fd_get_flags(c->rdma.fde_channel) == 0) {
-               c->reqs.last_ev = NULL;
-               TALLOC_FREE(c->ibv.fde_channel);
-               TALLOC_FREE(c->rdma.fde_channel);
-       }
-
-       if (c->reqs.read_pdu_req == NULL && c->reqs.write_pdu_req == NULL) {
-               c->reqs.last_ev = NULL;
-               TALLOC_FREE(c->ibv.fde_channel);
-               TALLOC_FREE(c->rdma.fde_channel);
-       }
-
-       if (ev == NULL) {
-               c->reqs.last_ev = NULL;
-               TALLOC_FREE(c->ibv.fde_channel);
-               TALLOC_FREE(c->rdma.fde_channel);
-       } else if (ev == c->reqs.last_ev) {
-               return NT_STATUS_OK;
-       } else if (c->reqs.last_ev == NULL) {
-               /* fallthrough */
-       } else {
-               return NT_STATUS_INVALID_PARAMETER_MIX;
-       }
-
-       c->ibv.fde_channel = tevent_add_fd(ev, c,
-                                          c->ibv.comp_channel->fd,
-                                          TEVENT_FD_READ,
-                                          smb_direct_connection_ibv_handler,
-                                          c);
-       if (c->ibv.fde_channel == NULL) {
-               return NT_STATUS_NO_MEMORY;
-       }
-       c->rdma.fde_channel = tevent_add_fd(ev, c,
-                                           c->rdma.cm_channel->fd,
-                                           TEVENT_FD_READ,
-                                           smb_direct_connection_rdma_handler,
-                                           c);
-       if (c->rdma.fde_channel == NULL) {
-               TALLOC_FREE(c->ibv.fde_channel);
-               return NT_STATUS_NO_MEMORY;
-       }
-
-       c->reqs.last_ev = ev;
-       return NT_STATUS_OK;
-}
-
-struct smb_direct_write_pdu_state {
-       struct smb_direct_connection *c;
-       uint8_t hdr[0x18];
-       uint8_t *buffer;
-       struct ibv_mr *hdr_mr;
-       struct ibv_mr *buffer_mr;
-       struct ibv_sge sge[2];
-       struct ibv_send_wr wr;
-};
-
-static int smb_direct_write_pdu_destructor(
-               struct smb_direct_write_pdu_state *state)
-{
-       if (state->c) {
-               state->c->reqs.write_pdu_req = NULL;
-       }
-
-       if (state->hdr_mr != NULL) {
-               ibv_dereg_mr(state->hdr_mr);
-               state->hdr_mr = NULL;
-       }
-       if (state->buffer_mr != NULL) {
-               ibv_dereg_mr(state->buffer_mr);
-               state->buffer_mr = NULL;
-       }
-
-       return 0;
-}
-
-static struct tevent_req *smb_direct_write_pdu_send(TALLOC_CTX *mem_ctx,
-                                      struct tevent_context *ev,
-                                      struct smb_transport *transport,
-                                      const struct iovec *vector,
-                                      size_t count)
+static void smb_direct_connection_sock_handler(struct tevent_context *ev,
+                                              struct tevent_fd *fde,
+                                              uint16_t fde_flags,
+                                              void *private_data)
 {
-       struct tevent_req *req;
-       struct smb_direct_write_pdu_state *state;
        struct smb_direct_connection *c =
-               smb_transport_data(transport,
+               talloc_get_type_abort(private_data,
                struct smb_direct_connection);
-       size_t to_write = 0;
-       size_t current_len = 0;
-       size_t remaining_len = 0;
-       uint8_t *ptr = NULL;
-       size_t buf_len = 0;
-       size_t i;
-       NTSTATUS status;
+       NTSTATUS status = NT_STATUS_INTERNAL_ERROR;
+       struct smb_direct_io *io = NULL;
        int ret;
-       struct ibv_send_wr *bad_send_wr = NULL;
-       uint16_t granted = 0;
-       uint16_t flags = 0;
-
-       req = tevent_req_create(mem_ctx, &state,
-                               struct smb_direct_write_pdu_state);
-       if (req == NULL) {
-               return NULL;
-       }
-       state->c = c;
-       talloc_set_destructor(state, smb_direct_write_pdu_destructor);
+       bool ok;
 
-       /* first check if the input is ok */
-#ifdef IOV_MAX
-       if (count > IOV_MAX) {
-               tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
-               return tevent_req_post(req, ev);
-       }
-#endif
-
-       for (i=0; i < count; i++) {
-               size_t tmp = to_write;
-               tmp += vector[i].iov_len;
-
-               if (tmp < to_write) {
-                       tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
-                       return tevent_req_post(req, ev);
+ do_write:
+       if (fde_flags & TEVENT_FD_WRITE) {
+               if (c->r2s.out != NULL) {
+                       io = c->r2s.out;
+               } else {
+                       io = c->r2s.ready;
+                       DLIST_REMOVE(c->r2s.ready, io);
+                       c->r2s.out = io;
                }
 
-               to_write = tmp;
-       }
+               if (io != NULL) {
+                       ssize_t sret;
 
-       if (to_write == 0) {
-               tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
-               return tevent_req_post(req, ev);
-       }
+                       sret = writev(c->sock.fd, io->iov, io->iov_count);
+                       if (sret == -1) {
+                               if (errno == EAGAIN) { // and more...
+                                       TEVENT_FD_WRITEABLE(c->sock.fde);
+                                       goto done_write;
+                               }
 
-       if (to_write > state->c->state.max_fragmented_size) {
-               tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
-               return tevent_req_post(req, ev);
-       }
+                               status = map_nt_error_from_unix_common(errno);
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, (int)sret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
 
-       if (state->c->state.send_credits == 0) {
-               tevent_req_nterror(req, NT_STATUS_INVALID_PARAMETER_MIX);
-               return tevent_req_post(req, ev);
-       }
+                       ok = iov_advance(&io->iov, &io->iov_count, sret);
+                       if (!ok) {
+                               status = map_nt_error_from_unix_common(errno);
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, (int)sret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
 
-       current_len = MIN(state->c->state.max_send_size - 0x18, to_write);
-       remaining_len = to_write - current_len;
+                       if (io->iov_count == 0) {
+                               c->r2s.out = NULL;
+                               DLIST_ADD_END(c->r2s.idle, io);
+                               goto do_write;
+                       }
+               } else {
+                       TEVENT_FD_NOT_WRITEABLE(c->sock.fde);
+               }
+       }
 
-       buf_len = current_len;
-       state->buffer = talloc_zero_array(state, uint8_t, buf_len);
-       if (tevent_req_nomem(state->buffer, req)) {
-               return tevent_req_post(req, ev);
+ done_write:
+       ret = smb_direct_connection_post_recv(c);
+       if (ret != 0) {
+               status = map_nt_error_from_unix_common(errno);
+               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                       __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+               smb_direct_connection_disconnect(c, status);
+               return;
        }
 
-       granted = state->c->state.receive_credit_max;
-       granted -= state->c->state.receive_credits;
-       granted = MIN(granted, state->c->state.receive_credit_target);
-       state->c->state.receive_credits += granted;
-       state->c->state.send_credits -= 1;
+ do_read:
+       if (fde_flags & TEVENT_FD_READ) {
+               if (c->s2r.in != NULL) {
+                       io = c->s2r.in;
+               } else {
+                       io = c->s2r.idle;
+                       DLIST_REMOVE(c->s2r.idle, io);
+                       c->s2r.in = io;
+
+                       if (c->s2r.remaining_length > 0) {
+                               /*
+                                * We need to continue to get
+                                * the incomplete packet.
+                                */
+                               io->data_length = MIN(sizeof(io->data),
+                                                     c->s2r.remaining_length);
+                               io->remaining_length = c->s2r.remaining_length;
+                               io->remaining_length -= io->data_length;
+                               c->s2r.remaining_length = io->remaining_length;
+
+                               io->iov = io->_iov_array;
+                               io->iov[0].iov_base = io->data;
+                               io->iov[0].iov_len = io->data_length;
+                               io->iov_count = 1;
+                       } else {
+                               /*
+                                * For a new packet we need to get the length
+                                * first.
+                                */
+                               io->data_length = 0;
+                               io->remaining_length = 0;
+
+                               io->iov = io->_iov_array;
+                               io->iov[0].iov_base = io->nbt_hdr;
+                               io->iov[0].iov_len = sizeof(io->nbt_hdr);
+                               io->iov_count = 1;
+                       }
+               }
+               if (io != NULL) {
+                       ssize_t sret;
 
-       if (state->c->state.send_credits == 0) {
-               flags |= 0x0001;
-       }
+                       sret = readv(c->sock.fd, io->iov, io->iov_count);
+                       if (sret == -1) {
+                               if (errno == EAGAIN) { // and more...
+                                       TEVENT_FD_READABLE(c->sock.fde);
+                                       goto do_read;
+                               }
 
-       SSVAL(state->hdr, 0x00, state->c->state.send_credit_target);
-       SSVAL(state->hdr, 0x02, granted);
-       SSVAL(state->hdr, 0x04, flags);
-       SSVAL(state->hdr, 0x06, 0x0000);
-       SIVAL(state->hdr, 0x08, remaining_len);
-       SIVAL(state->hdr, 0x0C, 0x00000018);
-       SIVAL(state->hdr, 0x10, current_len);
-       SIVAL(state->hdr, 0x14, 0x00000000);
+                               status = map_nt_error_from_unix_common(errno);
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, (int)sret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
+                       if (sret == 0) {
+                               status = NT_STATUS_CONNECTION_DISCONNECTED;
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, (int)sret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
 
-       ptr = state->buffer;
-       for (i=0; i < count; i++) {
-               const uint8_t *this_buf = (const uint8_t *)vector[i].iov_base;
-               size_t this_len = MIN(current_len, vector[i].iov_len);
+                       ok = iov_advance(&io->iov, &io->iov_count, sret);
+                       if (!ok) {
+                               status = map_nt_error_from_unix_common(errno);
+                               DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                       __location__, __FUNCTION__, (int)sret, errno, nt_errstr(status)));
+                               smb_direct_connection_disconnect(c, status);
+                               return;
+                       }
 
-               memcpy(ptr, this_buf, this_len);
-               ptr += this_len;
-               current_len -= this_len;
-               if (current_len == 0) {
-                       break;
+                       if (io->iov_count == 0) {
+                               if (io->data_length != 0) {
+                                       /*
+                                        * We managed to read the whole fragment
+                                        * which is ready to be posted into the
+                                        * send queue.
+                                        */
+                                       c->s2r.in = NULL;
+                                       DLIST_ADD_END(c->r2s.ready, io);
+                                       goto do_read;
+                               }
+
+                               c->s2r.remaining_length = smb_len_tcp(io->nbt_hdr);
+                               if (c->s2r.remaining_length > c->state.max_fragmented_size) { //correct direction
+                                       status = NT_STATUS_INVALID_BUFFER_SIZE;
+                                       DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
+                                               __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
+                                       smb_direct_connection_disconnect(c, status);
+                                       return;
+                               }
+
+                               io->data_length = MIN(sizeof(io->data),
+                                                     c->s2r.remaining_length);
+                               io->remaining_length = c->s2r.remaining_length;
+                               io->remaining_length -= io->data_length;
+                               c->s2r.remaining_length = io->remaining_length;
+
+                               io->iov = io->_iov_array;
+                               io->iov[0].iov_base = io->data;
+                               io->iov[0].iov_len = io->data_length;
+                               io->iov_count = 1;
+
+                               /*
+                                * try to read the reset immediately.
+                                */
+                               goto do_read;
+                       }
+               } else {
+                       TEVENT_FD_NOT_READABLE(c->sock.fde);
                }
        }
 
-       status = smb_direct_connection_setup_events(state->c, ev);
-       if (!NT_STATUS_IS_OK(status)) {
-               smb_direct_connection_disconnect(state->c, status);
-               tevent_req_nterror(req, status);
-               return tevent_req_post(req, ev);
-       }
-       state->c->reqs.write_pdu_req = req;
-
-       state->hdr_mr = ibv_reg_mr(state->c->ibv.pd,
-                                  state->hdr,
-                                  sizeof(state->hdr),
-                                  0);
-       if (state->hdr_mr == NULL) {
-               smb_direct_connection_disconnect(state->c, NT_STATUS_NO_MEMORY);
-               return req;
-       }
-
-       state->buffer_mr = ibv_reg_mr(state->c->ibv.pd,
-                                     state->buffer,
-                                     state->c->state.max_send_size,
-                                     0);
-       if (state->buffer_mr == NULL) {
-               smb_direct_connection_disconnect(state->c, NT_STATUS_NO_MEMORY);
-               return req;
-       }
-
-       state->sge[0].addr = (uint64_t) (uintptr_t) state->hdr;
-       state->sge[0].length = sizeof(state->hdr);;
-       state->sge[0].lkey = state->hdr_mr->lkey;
-       state->sge[1].addr = (uint64_t) (uintptr_t) state->buffer;
-       state->sge[1].length = buf_len;
-       state->sge[1].lkey = state->buffer_mr->lkey;
-       state->wr.opcode = IBV_WR_SEND;
-       state->wr.send_flags = IBV_SEND_SIGNALED;
-       state->wr.sg_list = state->sge;
-       state->wr.num_sge = ARRAY_SIZE(state->sge);
-
-       errno = 0;
-       ret = ibv_post_send(state->c->ibv.qp, &state->wr, &bad_send_wr);
+       ret = smb_direct_connection_post_send(c);
        if (ret != 0) {
                status = map_nt_error_from_unix_common(errno);
                DEBUG(0,("%s:%s: ret[%d] errno[%d] status[%s]\n",
                        __location__, __FUNCTION__, ret, errno, nt_errstr(status)));
-               smb_direct_connection_disconnect(state->c, status);
-               return req;
+               smb_direct_connection_disconnect(c, status);
+               return;
        }
-
-       return req;
 }
 
-static NTSTATUS smb_direct_write_pdu_recv(struct tevent_req *req)
+NTSTATUS smb_direct_connection_setup_events(struct smb_direct_connection *c,
+                                           struct tevent_context *ev)
 {
-       struct smb_direct_write_pdu_state *state =
-               tevent_req_data(req,
-               struct smb_direct_write_pdu_state);
-       NTSTATUS status;
-
-       state->c->reqs.write_pdu_req = NULL;
+       uint16_t sock_fde_flags = TEVENT_FD_READ;
 
-       if (tevent_req_is_nterror(req, &status)) {
-               tevent_req_received(req);
-               return status;
+       if (c->r2s.out != NULL) {
+               sock_fde_flags |= TEVENT_FD_WRITE;
        }
 
-       tevent_req_received(req);
-       return NT_STATUS_OK;
-}
-
-struct smb_direct_read_pdu_state {
-       struct smb_direct_connection *c;
-       uint8_t *inbuf;
-};
-
-static int smb_direct_read_pdu_destructor(
-               struct smb_direct_read_pdu_state *state)
-{
-       if (state->c != NULL) {
-               state->c->reqs.read_pdu_req = NULL;
+       if (tevent_fd_get_flags(c->ibv.fde_channel) == 0) {
+               c->last_ev = NULL;
+               TALLOC_FREE(c->sock.fde);
+               TALLOC_FREE(c->ibv.fde_channel);
+               TALLOC_FREE(c->rdma.fde_channel);
        }
 
-       return 0;
-}
-
-static struct tevent_req *smb_direct_read_pdu_send(TALLOC_CTX *mem_ctx,
-                                     struct tevent_context *ev,
-                                     struct smb_transport *transport)
-{
-       struct tevent_req *req;
-       struct smb_direct_read_pdu_state *state;
-       struct smb_direct_connection *c =
-               smb_transport_data(transport,
-               struct smb_direct_connection);
-
-       req = tevent_req_create(mem_ctx, &state,
-                               struct smb_direct_read_pdu_state);
-       if (req == NULL) {
-               return NULL;
+       if (tevent_fd_get_flags(c->rdma.fde_channel) == 0) {
+               c->last_ev = NULL;
+               TALLOC_FREE(c->sock.fde);
+               TALLOC_FREE(c->ibv.fde_channel);
+               TALLOC_FREE(c->rdma.fde_channel);
        }
-       state->c = c;
-       talloc_set_destructor(state, smb_direct_read_pdu_destructor);
 
-       state->c->reqs.read_pdu_req = req;
-
-       return req;
-}
-
-static NTSTATUS smb_direct_read_pdu_recv(struct tevent_req *req,
-                                               TALLOC_CTX *mem_ctx,
-                                               struct iovec *vector)
-{
-       struct smb_direct_read_pdu_state *state =
-               tevent_req_data(req,
-               struct smb_direct_read_pdu_state);
-       NTSTATUS status;
-
-       state->c->reqs.read_pdu_req = NULL;
-
-       if (tevent_req_is_nterror(req, &status)) {
-               tevent_req_received(req);
-               return status;
+       if (ev == NULL) {
+               c->last_ev = NULL;
+               TALLOC_FREE(c->sock.fde);
+               TALLOC_FREE(c->ibv.fde_channel);
+               TALLOC_FREE(c->rdma.fde_channel);
+       } else if (ev == c->last_ev) {
+               return NT_STATUS_OK;
+       } else if (c->last_ev == NULL) {
+               /* fallthrough */
+       } else {
+               return NT_STATUS_INVALID_PARAMETER_MIX;
        }
 
-       vector->iov_len = talloc_get_size(state->c->inbuf.full_buffer);
-       _smb_setlen_tcp(state->c->inbuf.full_buffer, vector->iov_len - 4);
-       vector->iov_base = talloc_move(mem_ctx, &state->c->inbuf.full_buffer);
+       c->rdma.fde_channel = tevent_add_fd(ev, c,
+                                           c->rdma.cm_channel->fd,
+                                           TEVENT_FD_READ,
+                                           smb_direct_connection_rdma_handler,
+                                           c);
+       if (c->rdma.fde_channel == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       c->ibv.fde_channel = tevent_add_fd(ev, c,
+                                          c->ibv.comp_channel->fd,
+                                          TEVENT_FD_READ,
+                                          smb_direct_connection_ibv_handler,
+                                          c);
+       if (c->ibv.fde_channel == NULL) {
+               TALLOC_FREE(c->rdma.fde_channel);
+               return NT_STATUS_NO_MEMORY;
+       }
+       c->sock.fde = tevent_add_fd(ev, c, c->sock.fd,
+                                   sock_fde_flags,
+                                   smb_direct_connection_sock_handler,
+                                   c);
+       if (c->sock.fde == NULL) {
+               TALLOC_FREE(c->rdma.fde_channel);
+               TALLOC_FREE(c->ibv.fde_channel);
+               return NT_STATUS_NO_MEMORY;
+       }
 
-       tevent_req_received(req);
+       c->last_ev = ev;
        return NT_STATUS_OK;
 }
 
-static const struct smb_transport_ops smb_direct_ops = {
-       .name                   = "smbdirect",
-
-       .read_pdu_send          = smb_direct_read_pdu_send,
-       .read_pdu_recv          = smb_direct_read_pdu_recv,
-
-       .write_pdu_send         = smb_direct_write_pdu_send,
-       .write_pdu_recv         = smb_direct_write_pdu_recv,
-};
-
 #endif /* SMB_TRANSPORT_ENABLE_RDMA */
index 07ed42b4248d22021d7608aebf04a3e83aa567c4..b13d0ec2ad5ce3d1eca665eed0d77adb9871b0da 100644 (file)
@@ -25,12 +25,15 @@ struct smb_direct_connection;
 
 struct smb_direct_connection *smb_direct_connection_create(TALLOC_CTX *mem_ctx);
 
-struct tevent_req *smb_direct_connect_send(TALLOC_CTX *mem_ctx,
-                                          struct tevent_context *ev,
-                                          struct smb_direct_connection *c,
-                                          const struct sockaddr_storage *src,
-                                          const struct sockaddr_storage *dst);
-int smb_direct_connect_recv(struct tevent_req *req, int *fd);
+struct tevent_req *smb_direct_connection_connect_send(TALLOC_CTX *mem_ctx,
+                                                     struct tevent_context *ev,
+                                                     struct smb_direct_connection *c,
+                                                     const struct sockaddr_storage *src,
+                                                     const struct sockaddr_storage *dst);
+NTSTATUS smb_direct_connection_connect_recv(struct tevent_req *req, int *fd);
+
+NTSTATUS smb_direct_connection_setup_events(struct smb_direct_connection *c,
+                                           struct tevent_context *ev);
 
 #define SMB_DIRECT_LISTEN_BACKLOG 100
 
index 1e7c47c26e099a13cc4b9364afc1fe6e9b170e53..04ace86000fce9ac3cb46317fc7abea5b2ef7387 100644 (file)
@@ -22,6 +22,7 @@
 #include "system/filesys.h"
 #include "system/network.h"
 #include <tevent.h>
+#include "lib/util/tevent_ntstatus.h"
 #include "lib/util/tevent_unix.h"
 #include "lib/util/tevent_ntstatus.h"
 #include "lib/util/blocking.h"