#define IBW_LASTERR_BUFSIZE 512
static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
-static ibw_mr *ibw_alloc_mr(ibw_ctx_priv *pctx)
+static int ibw_init_memory(ibw_conn *conn)
{
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+
+ int i, num_msg;
+ ibw_wr *p;
+
+ /* didn't find any reason to split send & recv buffer handling */
+ num_msg = pctx->opts.max_recv_wr + pctx->opts.max_send_wr;
+
+ pconn->buf = memalign(pctx->page_size, pctx->opts.max_msg_size);
+ if (!pconn->buf) {
+ sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+ return -1;
+ }
+ pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf, pctx->opts.bufsize, IBV_ACCESS_LOCAL_WRITE);
+ if (!pconn->mr) {
+ sprintf(ibw_lasterr, "Couldn't allocate mr\n");
+ return -1;
+ }
+
+ pconn->wr_index = talloc_size(pconn, num_msg * sizeof(ibw_wr *));
+
+ for(i=0; i<num_msg; i++) {
+ p = pconn->wr_index[i] = talloc_zero(pconn, ibw_wr);
+ p->msg = pconn->buf + (i * pconn->opts.max_msg_size);
+ p->wr_id = i;
+
+ DLIST_ADD(pconn->mr_list_avail, p);
+ }
+
+ return 0;
}
static int ibw_ctx_priv_destruct(void *ptr)
ibw_ctx *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
assert(pctx!=NULL);
- /* free memory regions */
-
- /* destroy verbs */
- if (pctx->cq) {
- ibv_destroy_cq(pctx->cq);
- pctx->cq = NULL;
- }
-
if (pctx->verbs_channel) {
ibv_destroy_comp_channel(pctx->verbs_channel);
pctx->verbs_channel = NULL;
ibw_ctx *ctx = talloc_get_type(ptr, ibw_ctx);
assert(ctx!=NULL);
- if (pconn->cm_id) {
- rdma_destroy_id(pconn->cm_id);
- pconn->cm_id = NULL;
- }
-
- /* free memory regions */
-
return 0;
}
{
ibw_conn *pconn = talloc_get_type(ptr, ibw_conn_priv);
assert(pconn!=NULL);
+
+ /* free memory regions */
+ if (pconn->mr) {
+ ibv_dereg_mr(pconn->mr);
+ pconn->mr = NULL;
+ }
+ if (pconn->buf) {
+ free(pconn->buf); /* memalign-ed */
+ pconn->buf = NULL;
+ }
+
+ /* pconn->wr_index is freed by talloc */
+ /* pconn->wr_index[i] are freed by talloc */
+
+ /* destroy verbs */
+ if (pconn->cm_id->qp) {
+ ibv_destroy_qp(pconn->qp);
+ pconn->qp = NULL;
+ }
+ if (pconn->cq) {
+ ibv_destroy_cq(pconn->cq);
+ pconn->cq = NULL;
+ }
+ if (pconn->cm_id) {
+ rdma_destroy_id(pctx->cm_id);
+ pctx->cm_id = NULL;
+ }
}
static int ibw_conn_destruct(void *ptr)
return conn;
}
-static int ibw_manage_connect(struct rdma_cm_id *cma_id)
+static int ibw_setup_cq_qp(ibw_conn *conn)
+{
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+ struct ibv_qp_init_attr init_attr;
+ int rc;
+
+ if (ibw_init_memory(conn))
+ return -1;
+
+ pctx->cq = ibv_create_cq(conn->cm_id->verbs, pctx->opts.max_send_wr + pctx->opts.max_recv_wr,
+ ctx, ctx->verbs_channel, 0);
+ if (cq==NULL) {
+ sprintf(ibw_lasterr, "ibv_create_cq failed\n");
+ return -1;
+ }
+
+ rc = ibv_req_notify_cq(pctx->cq, 0);
+ if (rc) {
+ sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
+ return rc;
+ }
+
+ memset(&init_attr, 0, sizeof(init_attr));
+ init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
+ init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
+ init_attr.cap.max_recv_sge = 1;
+ init_attr.cap.max_send_sge = 1;
+ init_attr.qp_type = IBV_QPT_RC;
+ init_attr.send_cq = ctx->cq;
+ init_attr.recv_cq = ctx->cq;
+
+ rc = rdma_create_qp(conn->cm_id, pctx->pd, &init_attr);
+ if (rc) {
+ sprintf(ibw_lasterr, "rdma_create_qp (%d) failed with %d\n", is_server, rc);
+ return rc;
+ }
+ /* elase result is in pconn->cm_id->qp */
+
+ return rc;
+}
+
+static void ibw_refill_cq(ibw_conn *conn)
+{
+}
+
+static int ibw_manage_connect(ibw_conn *conn, struct rdma_cm_id *cma_id)
{
struct rdma_conn_param conn_param;
int rc;
- /* TODO: setup verbs... */
+ rc = ibw_setup_cq_qp(conn);
+ if (rc)
+ return -1;
/* cm connect */
memset(&conn_param, 0, sizeof conn_param);
assert(ctx!=NULL);
- rc = rdma_get_cm_event(cb->cm_channel, &event);
+ rc = rdma_get_cm_event(pctx->cm_channel, &event);
if (rc) {
ctx->state = IBWS_ERROR;
sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
pctx->state = IWINT_ADDR_RESOLVED;
rc = rdma_resolve_route(cma_id, 2000);
if (rc) {
- cb->state = ERROR;
+ ctx->state = ERROR;
sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
DEBUG(0, ibw_lasterr);
}
conn = talloc_get_type(cma_id->context, ibw_conn);
pconn = talloc_get_type(conn->internal, ibw_conn_priv);
- rc = ibw_manage_connect(cma_id);
+ rc = ibw_manage_connect(conn, cma_id);
if (rc)
error = 1;
if (!pconn->is_accepted) {
talloc_free(conn);
DEBUG(10, "pconn->cm_id %p wasn't accepted\n", pconn->cm_id);
+ } else {
+ if (ibw_setup_cq_qp(ctx, conn))
+ error = 1;
}
/* TODO: clarify whether if it's needed by upper layer: */
case RDMA_CM_EVENT_ESTABLISHED:
/* expected after ibw_accept and ibw_connect[not directly] */
- DEBUG(0, "ESTABLISHED\n");
- ctx->state = IBWS_READY;
+ DEBUG(0, "ESTABLISHED (conn: %u)\n", cma_id->context);
conn = talloc_get_type(cma_id->context, ibw_conn);
assert(conn!=NULL); /* important assumption */
pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+ /* client conn is up */
conn->state = IBWC_CONNECTED;
/* both ctx and conn have changed */
pctx->connstate_func(NULL, conn);
talloc_free(conn);
+
+ if (ctx->conn_list==NULL)
+ rdma_disconnect(ctx->cm_id);
} else {
DEBUG(0, "server DISCONNECT event\n");
ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
+ /* talloc_free(ctx) should be called within or after this func */
pctx->connstate_func(ctx, NULL);
}
break;
static void ibw_event_handler_verbs(struct event_context *ev,
struct fd_event *fde, uint16_t flags, void *private_data)
{
- int rc;
ibw_ctx *ctx = talloc_get_type(private_data, ibw_ctx);
ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, ibw_ctx_priv);
+
}
static int ibw_process_init_attrs(ibw_initattr *attr, int nattr, ibw_opts *opts)
{
- int i;
+ int i, mtu;
char *name, *value;
-
+
+ opts->max_send_wr = 256;
+ opts->max_recv_wr = 1024;
+ opts->max_msg_size = 1024;
+
for(i=0; i<nattr; i++) {
name = attr[i].name;
value = attr[i].value;
assert(name!=NULL && value!=NULL);
- if (strcmp(name, "dev_name")==0)
- opts->opts.dev_name = talloc_strdup(ctx, value);
- else if (strcmp(name, "rx_depth")==0)
- opts->rx_depth = atoi(value);
- else if (strcmp(name, "mtu")==0)
- opts->mtu = atoi(value);
+ if (strcmp(name, "max_send_wr")==0)
+ opts->max_send_wr = atoi(value);
+ else if (strcmp(name, "max_recv_wr")==0)
+ opts->max_recv_wr = atoi(value);
+ else if (strcmp(name, "max_msg_size")==0)
+ opts->bufsize = atoi(value);
else {
sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
return -1;
/* init cm */
pctx->cm_channel = rdma_create_event_channel();
if (!pctx->cm_channel) {
- ret = errno;
- sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", ret);
+ sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
goto cleanup;
}
pctx->verbs_channel = ibv_create_comp_channel(cm_id->verbs);
if (!pctx->verbs_channel) {
- sprintf(stderr, "ibv_create_comp_channel failed %d\n", errno);
+ sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
goto cleanup;
}
- DEBUG_LOG("created channel %p\n", pctx->channel);
+ DEBUG(10, "created channel %p\n", pctx->channel);
pctx->verbs_channel_event = event_add_fd(pctx->ectx, pctx,
pctx->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, ctx);
- pctx->cq = ibv_create_cq(cm_id->verbs, pctx->opts.rx_depth, ctx,
- ctx->verbs_channel, 0);
-
- /* allocate ib memory regions */
+ pctx->pagesize = sysconf(_SC_PAGESIZE);
return ctx;
-
+ /* don't put code here */
cleanup:
DEBUG(0, ibw_lasterr);
+
if (ctx)
talloc_free(ctx);
int rc;
DEBUG_LOG("rdma_listen...\n");
- rc = rdma_listen(cb->cm_id, backlog);
+ rc = rdma_listen(pctx->cm_id, backlog);
if (rc) {
sprintf(ibw_lasterr, "rdma_listen failed: %d\n", ret);
DEBUG(0, ibw_lasterr);
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
struct rdma_conn_param conn_param;
+ conn->conn_userdata = conn_userdata;
+
memset(&conn_param, 0, sizeof(struct rdma_conn_param));
conn_param.responder_resources = 1;
conn_param.initiator_depth = 1;
void ibw_disconnect(ibw_conn *conn)
{
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
-
+ ibw_ctx *ctx = conn->ctx;
+ ibw_ctx_priv *pctx = talloc_get_type(ctx->internal);
+
+ rdma_disconnect(pctx->cm_id);
+
+ /* continued at RDMA_CM_EVENT_DISCONNECTED */
+
return 0;
}
-int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int n)
+int ibw_alloc_send_buf(ibw_conn *conn, void **buf, void **key, int *maxsize)
{
ibw_conn_priv *pconn = talloc_get_type(conn->internal, ibw_conn_priv);
+ ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_wr *p = pctx->wr_list_avail;
- return 0;
+ if (p==NULL) {
+ sprintf(ibw_last_err, "insufficient wr chunks\n");
+ return -1;
+ }
+
+ *maxsize = pctx->opts.max_msg_size;
+
+ DLIST_REMOVE(pctx->wr_list_avail, p);
+ DLIST_ADD(pctx->wr_list_used, p);
+
+ *buf = (void *)p->msg;
+ *key = (void *)p;
+
+ return pctx->buf;
}
int ibw_send(ibw_conn *conn, void *buf, void *key, int n)
{
- ibw_conn_priv *pconn = (ibw_ctx_priv *)ctx->internal;
- return 0;
+ ibw_ctx_priv pctx = talloc_get_type(conn->ctx->internal, ibw_ctx_priv);
+ ibw_wr *p = talloc_get_type(key, ibw_wr);
+ struct ibv_sge list = {
+ .addr = (uintptr_t) p->msg,
+ .length = n,
+ .lkey = pctx->mr->lkey
+ };
+ struct ibv_send_wr wr = {
+ .wr_id = p->wr_id,
+ .sg_list = &list,
+ .num_sge = 1,
+ .opcode = IBV_WR_SEND,
+ .send_flags = IBV_SEND_SIGNALED,
+ };
+ struct ibv_send_wr *bad_wr;
+
+ assert(p->msg==(char *)buf);
+
+ p->conn = conn; /* set it only now */
+
+ return ibv_post_send(conn->qp, &wr, &bad_wr);
}
const char *ibw_getLastError()