2 * Unix SMB/CIFS implementation.
3 * Wrap Infiniband calls.
5 * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
7 * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 2 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
32 #include <arpa/inet.h>
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
41 #include <rdma/rdma_cma.h>
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
49 static void ibw_event_handler_verbs(struct event_context *ev,
50 struct fd_event *fde, uint16_t flags, void *private_data);
51 static int ibw_fill_cq(struct ibw_conn *conn);
52 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
53 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
55 static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
56 int n, struct ibv_mr **ppmr)
59 buf = memalign(pctx->pagesize, n);
61 sprintf(ibw_lasterr, "couldn't allocate memory\n");
65 *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
67 sprintf(ibw_lasterr, "couldn't allocate mr\n");
75 static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
87 static int ibw_init_memory(struct ibw_conn *conn)
89 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
90 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
95 pconn->buf_send = ibw_alloc_mr(pctx, pconn,
96 pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
97 if (!pconn->buf_send) {
98 sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
102 pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
103 pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
104 if (!pconn->buf_recv) {
105 sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
109 pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
110 assert(pconn->wr_index!=NULL);
112 for(i=0; i<pctx->opts.max_send_wr; i++) {
113 p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
114 p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
117 DLIST_ADD(pconn->wr_list_avail, p);
123 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx)
126 ibv_dealloc_pd(pctx->pd);
131 if (pctx->cm_channel) {
132 rdma_destroy_event_channel(pctx->cm_channel);
133 pctx->cm_channel = NULL;
135 if (pctx->cm_channel_event) {
136 /* TODO: do we have to do this here? */
137 talloc_free(pctx->cm_channel_event);
138 pctx->cm_channel_event = NULL;
141 rdma_destroy_id(pctx->cm_id);
148 static int ibw_ctx_destruct(struct ibw_ctx *ctx)
153 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
155 /* free memory regions */
156 ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
157 ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
159 /* pconn->wr_index is freed by talloc */
160 /* pconn->wr_index[i] are freed by talloc */
163 if (pconn->cm_id->qp) {
164 ibv_destroy_qp(pconn->cm_id->qp);
165 pconn->cm_id->qp = NULL;
168 ibv_destroy_cq(pconn->cq);
171 if (pconn->verbs_channel) {
172 ibv_destroy_comp_channel(pconn->verbs_channel);
173 pconn->verbs_channel = NULL;
175 if (pconn->verbs_channel_event) {
176 /* TODO: do we have to do this here? */
177 talloc_free(pconn->verbs_channel_event);
178 pconn->verbs_channel_event = NULL;
181 rdma_destroy_id(pconn->cm_id);
187 static int ibw_conn_destruct(struct ibw_conn *conn)
189 /* important here: ctx is a talloc _parent_ */
190 DLIST_REMOVE(conn->ctx->conn_list, conn);
194 static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
196 struct ibw_conn *conn;
197 struct ibw_conn_priv *pconn;
199 conn = talloc_zero(ctx, struct ibw_conn);
201 talloc_set_destructor(conn, ibw_conn_destruct);
203 pconn = talloc_zero(ctx, struct ibw_conn_priv);
205 talloc_set_destructor(pconn, ibw_conn_priv_destruct);
209 DLIST_ADD(ctx->conn_list, conn);
214 static int ibw_setup_cq_qp(struct ibw_conn *conn)
216 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
217 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
218 struct ibv_qp_init_attr init_attr;
222 if (ibw_init_memory(conn))
226 pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
227 if (!pconn->verbs_channel) {
228 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
231 DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
233 pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
234 pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
237 pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
238 pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
239 conn, pconn->verbs_channel, 0);
240 if (pconn->cq==NULL) {
241 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
245 rc = ibv_req_notify_cq(pconn->cq, 0);
247 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
252 memset(&init_attr, 0, sizeof(init_attr));
253 init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
254 init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
255 init_attr.cap.max_recv_sge = 1;
256 init_attr.cap.max_send_sge = 1;
257 init_attr.qp_type = IBV_QPT_RC;
258 init_attr.send_cq = pconn->cq;
259 init_attr.recv_cq = pconn->cq;
261 rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
263 sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
266 /* elase result is in pconn->cm_id->qp */
268 return ibw_fill_cq(conn);
271 static int ibw_refill_cq_recv(struct ibw_conn *conn)
273 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
274 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
276 struct ibv_sge list = {
277 .addr = (uintptr_t) NULL,
278 .length = pctx->opts.recv_bufsize,
279 .lkey = pconn->mr_recv->lkey
281 struct ibv_recv_wr wr = {
286 struct ibv_recv_wr *bad_wr;
288 list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
289 wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
290 pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
292 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
294 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
295 DEBUG(0, (ibw_lasterr));
302 static int ibw_fill_cq(struct ibw_conn *conn)
304 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
305 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
307 struct ibv_sge list = {
308 .addr = (uintptr_t) NULL,
309 .length = pctx->opts.recv_bufsize,
310 .lkey = pconn->mr_recv->lkey
312 struct ibv_recv_wr wr = {
317 struct ibv_recv_wr *bad_wr;
319 for(i = pctx->opts.max_recv_wr; i!=0; i--) {
320 list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
321 wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
322 pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
324 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
326 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
327 DEBUG(0, (ibw_lasterr));
335 static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
337 struct rdma_conn_param conn_param;
340 rc = ibw_setup_cq_qp(conn);
345 memset(&conn_param, 0, sizeof conn_param);
346 conn_param.responder_resources = 1;
347 conn_param.initiator_depth = 1;
348 conn_param.retry_count = 10;
350 rc = rdma_connect(cma_id, &conn_param);
352 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
357 static void ibw_event_handler_cm(struct event_context *ev,
358 struct fd_event *fde, uint16_t flags, void *private_data)
361 struct ibw_ctx *ctx = talloc_get_type(private_data, struct ibw_ctx);
362 struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
363 struct ibw_conn *conn = NULL;
364 struct ibw_conn_priv *pconn = NULL;
365 struct rdma_cm_id *cma_id = NULL;
366 struct rdma_cm_event *event = NULL;
370 rc = rdma_get_cm_event(pctx->cm_channel, &event);
372 ctx->state = IBWS_ERROR;
373 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
378 DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id,
379 (cma_id == pctx->cm_id) ? "parent" : "child"));
381 switch (event->event) {
382 case RDMA_CM_EVENT_ADDR_RESOLVED:
383 /* continuing from ibw_connect ... */
384 rc = rdma_resolve_route(cma_id, 2000);
386 sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
389 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
392 case RDMA_CM_EVENT_ROUTE_RESOLVED:
393 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
394 assert(cma_id->context!=NULL);
395 conn = talloc_get_type(cma_id->context, struct ibw_conn);
397 rc = ibw_manage_connect(conn, cma_id);
403 case RDMA_CM_EVENT_CONNECT_REQUEST:
404 ctx->state = IBWS_CONNECT_REQUEST;
405 conn = ibw_conn_new(ctx);
406 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
407 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
408 cma_id->context = (void *)conn;
409 DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
411 conn->state = IBWC_INIT;
412 pctx->connstate_func(ctx, conn);
414 /* continued at ibw_accept when invoked by the func above */
415 if (!pconn->is_accepted) {
417 DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
419 if (ibw_setup_cq_qp(conn))
423 /* TODO: clarify whether if it's needed by upper layer: */
424 ctx->state = IBWS_READY;
425 pctx->connstate_func(ctx, NULL);
427 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
430 case RDMA_CM_EVENT_ESTABLISHED:
431 /* expected after ibw_accept and ibw_connect[not directly] */
432 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id->context));
433 conn = talloc_get_type(cma_id->context, struct ibw_conn);
434 assert(conn!=NULL); /* important assumption */
436 /* client conn is up */
437 conn->state = IBWC_CONNECTED;
439 /* both ctx and conn have changed */
440 pctx->connstate_func(ctx, conn);
443 case RDMA_CM_EVENT_ADDR_ERROR:
444 case RDMA_CM_EVENT_ROUTE_ERROR:
445 case RDMA_CM_EVENT_CONNECT_ERROR:
446 case RDMA_CM_EVENT_UNREACHABLE:
447 case RDMA_CM_EVENT_REJECTED:
448 sprintf(ibw_lasterr, "cma event %d, error %d\n", event->event, event->status);
451 case RDMA_CM_EVENT_DISCONNECTED:
452 if (cma_id!=pctx->cm_id) {
453 DEBUG(0, ("client DISCONNECT event\n"));
454 conn = talloc_get_type(cma_id->context, struct ibw_conn);
455 conn->state = IBWC_DISCONNECTED;
456 pctx->connstate_func(NULL, conn);
460 /* if we are the last... */
461 if (ctx->conn_list==NULL)
462 rdma_disconnect(pctx->cm_id);
464 DEBUG(0, ("server DISCONNECT event\n"));
465 ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
466 /* talloc_free(ctx) should be called within or after this func */
467 pctx->connstate_func(ctx, NULL);
471 case RDMA_CM_EVENT_DEVICE_REMOVAL:
472 sprintf(ibw_lasterr, "cma detected device removal!\n");
476 sprintf(ibw_lasterr, "unknown event %d\n", event->event);
480 if ((rc=rdma_ack_cm_event(event))) {
481 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
487 DEBUG(0, ("cm event handler: %s", ibw_lasterr));
488 if (cma_id!=pctx->cm_id) {
489 conn = talloc_get_type(cma_id->context, struct ibw_conn);
491 conn->state = IBWC_ERROR;
492 pctx->connstate_func(NULL, conn);
494 ctx->state = IBWS_ERROR;
495 pctx->connstate_func(ctx, NULL);
499 static void ibw_event_handler_verbs(struct event_context *ev,
500 struct fd_event *fde, uint16_t flags, void *private_data)
502 struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn);
503 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
504 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
508 struct ibv_cq *ev_cq;
511 /* TODO: check whether if it's good to have more channels here... */
512 rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
514 sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
517 if (ev_cq != pconn->cq) {
518 sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
519 (unsigned int)ev_cq, (unsigned int)pconn->cq);
522 rc = ibv_req_notify_cq(pconn->cq, 0);
524 sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
528 while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
530 sprintf(ibw_lasterr, "cq completion failed status %d\n",
537 DEBUG(10, ("send completion\n"));
538 if (ibw_wc_send(conn, &wc))
542 case IBV_WC_RDMA_WRITE:
543 DEBUG(10, ("rdma write completion\n"));
546 case IBV_WC_RDMA_READ:
547 DEBUG(10, ("rdma read completion\n"));
551 DEBUG(10, ("recv completion\n"));
552 if (ibw_wc_recv(conn, &wc))
557 sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
562 sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
568 DEBUG(0, (ibw_lasterr));
569 conn->state = IBWC_ERROR;
570 pctx->connstate_func(NULL, conn);
573 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
575 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
576 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
579 assert(pconn->cm_id->qp->qp_num==wc->qp_num);
580 assert(wc->wr_id < pctx->opts.max_send_wr);
582 p = pconn->wr_index[wc->wr_id];
584 ibw_free_mr(&p->msg_large, &p->mr_large);
587 DLIST_REMOVE(pconn->wr_list_used, p);
588 DLIST_ADD(pconn->wr_list_avail, p);
593 static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
594 char **pp, uint32_t add_len, int info)
596 /* allocate more if necessary - it's an "evergrowing" buffer... */
597 if (part->len + add_len > part->bufsize) {
598 if (part->buf==NULL) {
599 assert(part->len==0);
600 part->buf = talloc_size(memctx, add_len);
601 if (part->buf==NULL) {
602 sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
606 part->bufsize = add_len;
608 part->buf = talloc_realloc_size(memctx,
609 part->buf, part->len + add_len);
610 if (part->buf==NULL) {
611 sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
612 part->len, add_len, info);
616 part->bufsize = part->len + add_len;
620 memcpy(part->buf + part->len, *pp, add_len);
622 part->len += add_len;
623 part->to_read -= add_len;
628 static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
630 if (part->bufsize > threshold) {
631 talloc_free(part->buf);
632 part->buf = talloc_size(memctx, threshold);
633 if (part->buf==NULL) {
634 sprintf(ibw_lasterr, "talloc_size failed\n");
637 part->bufsize = threshold;
642 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
644 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
645 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
649 struct ibw_part *part;
651 assert(pconn->cm_id->qp->qp_num==wc->qp_num);
652 assert((int)wc->wr_id > pctx->opts.max_send_wr);
653 recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
654 assert(recv_index < pctx->opts.max_recv_wr);
655 assert(wc->byte_len <= pctx->opts.recv_bufsize);
657 p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
660 remain = wc->byte_len;
662 /* here always true: (part->len!=0 && part->to_read!=0) ||
663 (part->len==0 && part->to_read==0) */
664 if (part->len) { /* is there a partial msg to be continued? */
665 int read_len = (part->to_read<=remain) ? part->to_read : remain;
666 if (ibw_append_to_part(pconn, part, &p, read_len, 421))
670 if (part->len<=sizeof(uint32_t) && part->to_read==0) {
671 assert(part->len==sizeof(uint32_t));
672 /* set it again now... */
673 part->to_read = *((uint32_t *)(part->buf));
674 if (part->to_read<sizeof(uint32_t)) {
675 sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
678 part->to_read -= sizeof(uint32_t); /* it's already read */
681 if (part->to_read==0) {
682 pctx->receive_func(conn, part->buf, part->len);
683 part->len = 0; /* tells not having partial data (any more) */
684 if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
688 if (remain>=sizeof(uint32_t)) {
689 uint32_t msglen = *(uint32_t *)p;
690 if (msglen<sizeof(uint32_t)) {
691 sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
695 /* mostly awaited case: */
696 if (msglen<=remain) {
697 pctx->receive_func(conn, p, msglen);
701 part->to_read = msglen;
702 /* part->len is already 0 */
703 if (ibw_append_to_part(pconn, part, &p, remain, 422))
705 remain = 0; /* to be continued ... */
706 /* part->to_read > 0 here */
708 } else { /* edge case: */
709 part->to_read = sizeof(uint32_t);
710 /* part->len is already 0 */
711 if (ibw_append_to_part(pconn, part, &p, remain, 423))
714 /* part->to_read > 0 here */
717 } /* <remain> is always decreased at least by 1 */
719 if (ibw_refill_cq_recv(conn))
725 DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
726 conn->state = IBWC_ERROR;
730 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
733 const char *name, *value;
735 opts->max_send_wr = 256;
736 opts->max_recv_wr = 1024;
737 opts->avg_send_size = 1024;
738 opts->recv_bufsize = 256;
739 opts->recv_threshold = 1 * 1024 * 1024;
741 for(i=0; i<nattr; i++) {
743 value = attr[i].value;
745 assert(name!=NULL && value!=NULL);
746 if (strcmp(name, "max_send_wr")==0)
747 opts->max_send_wr = atoi(value);
748 else if (strcmp(name, "max_recv_wr")==0)
749 opts->max_recv_wr = atoi(value);
750 else if (strcmp(name, "avg_send_size")==0)
751 opts->avg_send_size = atoi(value);
752 else if (strcmp(name, "recv_bufsize")==0)
753 opts->recv_bufsize = atoi(value);
754 else if (strcmp(name, "recv_threshold")==0)
755 opts->recv_threshold = atoi(value);
757 sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
764 struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
766 ibw_connstate_fn_t ibw_connstate,
767 ibw_receive_fn_t ibw_receive,
768 struct event_context *ectx)
770 struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
771 struct ibw_ctx_priv *pctx;
774 /* initialize basic data structures */
775 memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
778 ibw_lasterr[0] = '\0';
779 talloc_set_destructor(ctx, ibw_ctx_destruct);
780 ctx->ctx_userdata = ctx_userdata;
782 pctx = talloc_zero(ctx, struct ibw_ctx_priv);
783 talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
784 ctx->internal = (void *)pctx;
787 pctx->connstate_func = ibw_connstate;
788 pctx->receive_func = ibw_receive;
792 /* process attributes */
793 if (ibw_process_init_attrs(attr, nattr, &pctx->opts))
797 pctx->cm_channel = rdma_create_event_channel();
798 if (!pctx->cm_channel) {
799 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
803 pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
804 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
806 rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);
809 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
812 DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
815 pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
817 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
820 DEBUG(10, ("created pd %p\n", pctx->pd));
822 pctx->pagesize = sysconf(_SC_PAGESIZE);
825 /* don't put code here */
827 DEBUG(0, (ibw_lasterr));
835 int ibw_stop(struct ibw_ctx *ctx)
839 for(p=ctx->conn_list; p!=NULL; p=p->next) {
840 if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {
841 if (ibw_disconnect(p))
849 int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr)
851 struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;
854 rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
856 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
857 DEBUG(0, (ibw_lasterr));
860 DEBUG(10, ("rdma_bind_addr successful\n"));
865 int ibw_listen(struct ibw_ctx *ctx, int backlog)
867 struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
870 DEBUG(10, ("rdma_listen...\n"));
871 rc = rdma_listen(pctx->cm_id, backlog);
873 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
874 DEBUG(0, (ibw_lasterr));
881 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
883 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
884 struct rdma_conn_param conn_param;
887 conn->conn_userdata = conn_userdata;
889 memset(&conn_param, 0, sizeof(struct rdma_conn_param));
890 conn_param.responder_resources = 1;
891 conn_param.initiator_depth = 1;
892 rc = rdma_accept(pconn->cm_id, &conn_param);
894 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
895 DEBUG(0, (ibw_lasterr));
899 pconn->is_accepted = 1;
901 /* continued at RDMA_CM_EVENT_ESTABLISHED */
906 int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
908 struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
909 struct ibw_conn *conn = NULL;
910 struct ibw_conn_priv *pconn = NULL;
913 conn = ibw_conn_new(ctx);
914 conn->conn_userdata = conn_userdata;
915 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
917 rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
920 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
924 rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
926 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
927 DEBUG(0, (ibw_lasterr));
931 /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
936 int ibw_disconnect(struct ibw_conn *conn)
939 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
941 rc = rdma_disconnect(pctx->cm_id);
943 sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
944 DEBUG(0, (ibw_lasterr));
948 /* continued at RDMA_CM_EVENT_DISCONNECTED */
953 int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
955 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
956 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
957 struct ibw_wr *p = pconn->wr_list_avail;
960 sprintf(ibw_lasterr, "insufficient wr chunks\n");
964 DLIST_REMOVE(pconn->wr_list_avail, p);
965 DLIST_ADD(pconn->wr_list_used, p);
967 if (n + sizeof(long) <= pctx->opts.avg_send_size) {
968 *buf = (void *)(p->msg + sizeof(long));
971 p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
973 sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
974 DEBUG(0, (ibw_lasterr));
977 *buf = (void *)(p->msg_large + sizeof(long));
983 int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
985 struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
986 struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
987 struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
988 struct ibv_sge list = {
989 .addr = (uintptr_t) NULL,
993 struct ibv_send_wr wr = {
997 .opcode = IBV_WR_SEND,
998 .send_flags = IBV_SEND_SIGNALED,
1000 struct ibv_send_wr *bad_wr;
1002 if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
1003 assert((p->msg + sizeof(long))==(char *)buf);
1004 list.lkey = pconn->mr_send->lkey;
1005 list.addr = (uintptr_t) p->msg;
1007 *((uint32_t *)p->msg) = htonl(n);
1009 assert((p->msg_large + sizeof(long))==(char *)buf);
1010 assert(p->mr_large!=NULL);
1011 list.lkey = p->mr_large->lkey;
1012 list.addr = (uintptr_t) p->msg_large;
1014 *((uint32_t *)p->msg_large) = htonl(n);
1017 return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
1020 const char *ibw_getLastError(void)