merge IB code from peter
[metze/samba/wip.git] / ctdb / ib / ibwrapper.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Wrap Infiniband calls.
4  *
5  * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
6  *
7  * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <malloc.h>
34 #include <assert.h>
35 #include <unistd.h>
36
37 #include "includes.h"
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
40
41 #include <rdma/rdma_cma.h>
42
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
45
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
48
49 static void ibw_event_handler_verbs(struct event_context *ev,
50         struct fd_event *fde, uint16_t flags, void *private_data);
51 static int ibw_fill_cq(struct ibw_conn *conn);
52
53
54 static int ibw_init_memory(struct ibw_conn *conn)
55 {
56         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
57         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
58
59         int     i;
60         struct ibw_wr   *p;
61
62         pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size);
63         if (!pconn->buf) {
64                 sprintf(ibw_lasterr, "couldn't allocate work buf\n");
65                 return -1;
66         }
67         pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf,
68                 pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE);
69         if (!pconn->mr) {
70                 sprintf(ibw_lasterr, "couldn't allocate mr\n");
71                 return -1;
72         }
73
74         pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *));
75
76         for(i=0; i<pctx->qsize; i++) {
77                 p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
78                 p->msg = pconn->buf + (i * pctx->max_msg_size);
79                 p->wr_id = i;
80
81                 DLIST_ADD(pconn->wr_list_avail, p);
82         }
83
84         return 0;
85 }
86
87 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx)
88 {
89         if (pctx->pd) {
90                 ibv_dealloc_pd(pctx->pd);
91                 pctx->pd = NULL;
92         }
93
94         /* destroy cm */
95         if (pctx->cm_channel) {
96                 rdma_destroy_event_channel(pctx->cm_channel);
97                 pctx->cm_channel = NULL;
98         }
99         if (pctx->cm_channel_event) {
100                 /* TODO: do we have to do this here? */
101                 talloc_free(pctx->cm_channel_event);
102                 pctx->cm_channel_event = NULL;
103         }
104         if (pctx->cm_id) {
105                 rdma_destroy_id(pctx->cm_id);
106                 pctx->cm_id = NULL;
107         }
108
109         return 0;
110 }
111
112 static int ibw_ctx_destruct(struct ibw_ctx *ctx)
113 {
114         return 0;
115 }
116
117 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
118 {
119         /* free memory regions */
120         if (pconn->mr) {
121                 ibv_dereg_mr(pconn->mr);
122                 pconn->mr = NULL;
123         }
124         if (pconn->buf) {
125                 free(pconn->buf); /* memalign-ed */
126                 pconn->buf = NULL;
127         }
128
129         /* pconn->wr_index is freed by talloc */
130         /* pconn->wr_index[i] are freed by talloc */
131
132         /* destroy verbs */
133         if (pconn->cm_id->qp) {
134                 ibv_destroy_qp(pconn->cm_id->qp);
135                 pconn->cm_id->qp = NULL;
136         }
137         if (pconn->cq) {
138                 ibv_destroy_cq(pconn->cq);
139                 pconn->cq = NULL;
140         }
141         if (pconn->verbs_channel) {
142                 ibv_destroy_comp_channel(pconn->verbs_channel);
143                 pconn->verbs_channel = NULL;
144         }
145         if (pconn->verbs_channel_event) {
146                 /* TODO: do we have to do this here? */
147                 talloc_free(pconn->verbs_channel_event);
148                 pconn->verbs_channel_event = NULL;
149         }
150         if (pconn->cm_id) {
151                 rdma_destroy_id(pconn->cm_id);
152                 pconn->cm_id = NULL;
153         }
154         return 0;
155 }
156
157 static int ibw_conn_destruct(struct ibw_conn *conn)
158 {
159         /* important here: ctx is a talloc _parent_ */
160         DLIST_REMOVE(conn->ctx->conn_list, conn);
161         return 0;
162 }
163
164 static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
165 {
166         struct ibw_conn *conn;
167         struct ibw_conn_priv *pconn;
168
169         conn = talloc_zero(ctx, struct ibw_conn);
170         assert(conn!=NULL);
171         talloc_set_destructor(conn, ibw_conn_destruct);
172
173         pconn = talloc_zero(ctx, struct ibw_conn_priv);
174         assert(pconn!=NULL);
175         talloc_set_destructor(pconn, ibw_conn_priv_destruct);
176
177         conn->ctx = ctx;
178
179         DLIST_ADD(ctx->conn_list, conn);
180
181         return conn;
182 }
183
184 static int ibw_setup_cq_qp(struct ibw_conn *conn)
185 {
186         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
187         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
188         struct ibv_qp_init_attr init_attr;
189         int rc;
190
191         /* init mr */
192         if (ibw_init_memory(conn))
193                 return -1;
194
195         /* init verbs */
196         pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
197         if (!pconn->verbs_channel) {
198                 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
199                 return -1;
200         }
201         DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
202
203         pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
204                 pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
205
206         /* init cq */
207         pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize,
208                 conn, pconn->verbs_channel, 0);
209         if (pconn->cq==NULL) {
210                 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
211                 return -1;
212         }
213
214         rc = ibv_req_notify_cq(pconn->cq, 0);
215         if (rc) {
216                 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
217                 return rc;
218         }
219
220         /* init qp */
221         memset(&init_attr, 0, sizeof(init_attr));
222         init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
223         init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
224         init_attr.cap.max_recv_sge = 1;
225         init_attr.cap.max_send_sge = 1;
226         init_attr.qp_type = IBV_QPT_RC;
227         init_attr.send_cq = pconn->cq;
228         init_attr.recv_cq = pconn->cq;
229
230         rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
231         if (rc) {
232                 sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
233                 return rc;
234         }
235         /* elase result is in pconn->cm_id->qp */
236
237         return ibw_fill_cq(conn);
238 }
239
240 static int ibw_refill_cq_recv(struct ibw_conn *conn)
241 {
242         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
243         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
244         int     rc;
245         struct ibv_sge list = {
246                 .addr   = (uintptr_t) NULL,
247                 .length = pctx->max_msg_size,
248                 .lkey   = pconn->mr->lkey
249         };
250         struct ibv_recv_wr wr = {
251                 .wr_id      = 0,
252                 .sg_list    = &list,
253                 .num_sge    = 1,
254         };
255         struct ibv_recv_wr *bad_wr;
256         struct ibw_wr   *p = pconn->wr_list_avail;
257
258         if (p==NULL) {
259                 sprintf(ibw_lasterr, "out of wr_list_avail");
260                 DEBUG(0, (ibw_lasterr));
261                 return -1;
262         }
263         DLIST_REMOVE(pconn->wr_list_avail, p);
264         DLIST_ADD(pconn->wr_list_used, p);
265         list.addr = (uintptr_t) p->msg;
266         wr.wr_id = p->wr_id;
267
268         rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
269         if (rc) {
270                 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
271                 DEBUG(0, (ibw_lasterr));
272                 return -2;
273         }
274
275         return 0;
276 }
277
278 static int ibw_fill_cq(struct ibw_conn *conn)
279 {
280         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
281         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
282         int     i, rc;
283         struct ibv_sge list = {
284                 .addr   = (uintptr_t) NULL,
285                 .length = pctx->max_msg_size,
286                 .lkey   = pconn->mr->lkey
287         };
288         struct ibv_recv_wr wr = {
289                 .wr_id      = 0,
290                 .sg_list    = &list,
291                 .num_sge    = 1,
292         };
293         struct ibv_recv_wr *bad_wr;
294         struct ibw_wr   *p;
295
296         for(i = pctx->opts.max_recv_wr; i!=0; i--) {
297                 p = pconn->wr_list_avail;
298                 if (p==NULL) {
299                         sprintf(ibw_lasterr, "out of wr_list_avail");
300                         DEBUG(0, (ibw_lasterr));
301                         return -1;
302                 }
303                 DLIST_REMOVE(pconn->wr_list_avail, p);
304                 DLIST_ADD(pconn->wr_list_used, p);
305                 list.addr = (uintptr_t) p->msg;
306                 wr.wr_id = p->wr_id;
307
308                 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
309                 if (rc) {
310                         sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
311                         DEBUG(0, (ibw_lasterr));
312                         return -2;
313                 }
314         }
315
316         return 0;
317 }
318
319 static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
320 {
321         struct rdma_conn_param conn_param;
322         int     rc;
323
324         rc = ibw_setup_cq_qp(conn);
325         if (rc)
326                 return -1;
327
328         /* cm connect */
329         memset(&conn_param, 0, sizeof conn_param);
330         conn_param.responder_resources = 1;
331         conn_param.initiator_depth = 1;
332         conn_param.retry_count = 10;
333
334         rc = rdma_connect(cma_id, &conn_param);
335         if (rc)
336                 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
337
338         return rc;
339 }
340
341 static void ibw_event_handler_cm(struct event_context *ev,
342         struct fd_event *fde, uint16_t flags, void *private_data)
343 {
344         int     rc;
345         struct ibw_ctx  *ctx = talloc_get_type(private_data, struct ibw_ctx);
346         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
347         struct ibw_conn *conn = NULL;
348         struct ibw_conn_priv *pconn = NULL;
349         struct rdma_cm_id *cma_id = NULL;
350         struct rdma_cm_event *event = NULL;
351
352         assert(ctx!=NULL);
353
354         rc = rdma_get_cm_event(pctx->cm_channel, &event);
355         if (rc) {
356                 ctx->state = IBWS_ERROR;
357                 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
358                 goto error;
359         }
360         cma_id = event->id;
361
362         DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id,
363                   (cma_id == pctx->cm_id) ? "parent" : "child"));
364
365         switch (event->event) {
366         case RDMA_CM_EVENT_ADDR_RESOLVED:
367                 /* continuing from ibw_connect ... */
368                 rc = rdma_resolve_route(cma_id, 2000);
369                 if (rc) {
370                         sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
371                         goto error;
372                 }
373                 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
374                 break;
375
376         case RDMA_CM_EVENT_ROUTE_RESOLVED:
377                 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
378                 assert(cma_id->context!=NULL);
379                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
380
381                 rc = ibw_manage_connect(conn, cma_id);
382                 if (rc)
383                         goto error;
384
385                 break;
386
387         case RDMA_CM_EVENT_CONNECT_REQUEST:
388                 ctx->state = IBWS_CONNECT_REQUEST;
389                 conn = ibw_conn_new(ctx);
390                 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
391                 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
392                 cma_id->context = (void *)conn;
393                 DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
394
395                 conn->state = IBWC_INIT;
396                 pctx->connstate_func(ctx, conn);
397
398                 /* continued at ibw_accept when invoked by the func above */
399                 if (!pconn->is_accepted) {
400                         talloc_free(conn);
401                         DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
402                 } else {
403                         if (ibw_setup_cq_qp(conn))
404                                 goto error;
405                 }
406
407                 /* TODO: clarify whether if it's needed by upper layer: */
408                 ctx->state = IBWS_READY;
409                 pctx->connstate_func(ctx, NULL);
410
411                 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
412                 break;
413
414         case RDMA_CM_EVENT_ESTABLISHED:
415                 /* expected after ibw_accept and ibw_connect[not directly] */
416                 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id->context));
417                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
418                 assert(conn!=NULL); /* important assumption */
419
420                 /* client conn is up */
421                 conn->state = IBWC_CONNECTED;
422
423                 /* both ctx and conn have changed */
424                 pctx->connstate_func(ctx, conn);
425                 break;
426
427         case RDMA_CM_EVENT_ADDR_ERROR:
428         case RDMA_CM_EVENT_ROUTE_ERROR:
429         case RDMA_CM_EVENT_CONNECT_ERROR:
430         case RDMA_CM_EVENT_UNREACHABLE:
431         case RDMA_CM_EVENT_REJECTED:
432                 sprintf(ibw_lasterr, "cma event %d, error %d\n", event->event, event->status);
433                 goto error;
434
435         case RDMA_CM_EVENT_DISCONNECTED:
436                 if (cma_id!=pctx->cm_id) {
437                         DEBUG(0, ("client DISCONNECT event\n"));
438                         conn = talloc_get_type(cma_id->context, struct ibw_conn);
439                         conn->state = IBWC_DISCONNECTED;
440                         pctx->connstate_func(NULL, conn);
441
442                         talloc_free(conn);
443
444                         /* if we are the last... */
445                         if (ctx->conn_list==NULL)
446                                 rdma_disconnect(pctx->cm_id);
447                 } else {
448                         DEBUG(0, ("server DISCONNECT event\n"));
449                         ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
450                         /* talloc_free(ctx) should be called within or after this func */
451                         pctx->connstate_func(ctx, NULL);
452                 }
453                 break;
454
455         case RDMA_CM_EVENT_DEVICE_REMOVAL:
456                 sprintf(ibw_lasterr, "cma detected device removal!\n");
457                 goto error;
458
459         default:
460                 sprintf(ibw_lasterr, "unknown event %d\n", event->event);
461                 goto error;
462         }
463
464         if ((rc=rdma_ack_cm_event(event))) {
465                 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
466                 goto error;
467         }
468
469         return;
470 error:
471         DEBUG(0, ("cm event handler: %s", ibw_lasterr));
472         if (cma_id!=pctx->cm_id) {
473                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
474                 if (conn)
475                         conn->state = IBWC_ERROR;
476                 pctx->connstate_func(NULL, conn);
477         } else {
478                 ctx->state = IBWS_ERROR;
479                 pctx->connstate_func(ctx, NULL);
480         }
481 }
482
483 static void ibw_event_handler_verbs(struct event_context *ev,
484         struct fd_event *fde, uint16_t flags, void *private_data)
485 {
486         struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn);
487         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
488         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
489
490         struct ibv_wc wc;
491         int rc;
492
493         rc = ibv_poll_cq(pconn->cq, 1, &wc);
494         if (rc!=1) {
495                 sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
496                 goto error;
497         }
498         if (wc.status) {
499                 sprintf(ibw_lasterr, "cq completion failed status %d\n",
500                         wc.status);
501                 goto error;
502         }
503
504         switch(wc.opcode) {
505         case IBV_WC_SEND:
506                 {
507                         struct ibw_wr   *p;
508         
509                         DEBUG(10, ("send completion\n"));
510                         assert(pconn->cm_id->qp->qp_num==wc.qp_num);
511                         assert(wc.wr_id < pctx->qsize);
512                         p = pconn->wr_index[wc.wr_id];
513                         DLIST_REMOVE(pconn->wr_list_used, p);
514                         DLIST_ADD(pconn->wr_list_avail, p);
515                 }
516                 break;
517
518         case IBV_WC_RDMA_WRITE:
519                 DEBUG(10, ("rdma write completion\n"));
520                 break;
521
522         case IBV_WC_RDMA_READ:
523                 DEBUG(10, ("rdma read completion\n"));
524                 break;
525
526         case IBV_WC_RECV:
527                 {
528                         struct ibw_wr   *p;
529         
530                         assert(pconn->cm_id->qp->qp_num==wc.qp_num);
531                         assert(wc.wr_id < pctx->qsize);
532                         p = pconn->wr_index[wc.wr_id];
533         
534                         DLIST_REMOVE(pconn->wr_list_used, p);
535                         DLIST_ADD(pconn->wr_list_avail, p);
536         
537                         DEBUG(10, ("recv completion\n"));
538                         assert(wc.byte_len <= pctx->max_msg_size);
539         
540                         pctx->receive_func(conn, p->msg, wc.byte_len);
541                         if (ibw_refill_cq_recv(conn))
542                                 goto error;
543                 }
544                 break;
545
546         default:
547                 sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
548                 goto error;
549         }
550
551         return;
552 error:
553         DEBUG(0, (ibw_lasterr));
554         conn->state = IBWC_ERROR;
555         pctx->connstate_func(NULL, conn);
556 }
557
558 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
559 {
560         int     i;
561         const char *name, *value;
562
563         opts->max_send_wr = 256;
564         opts->max_recv_wr = 1024;
565
566         for(i=0; i<nattr; i++) {
567                 name = attr[i].name;
568                 value = attr[i].value;
569
570                 assert(name!=NULL && value!=NULL);
571                 if (strcmp(name, "max_send_wr")==0)
572                         opts->max_send_wr = atoi(value);
573                 else if (strcmp(name, "max_recv_wr")==0)
574                         opts->max_recv_wr = atoi(value);
575                 else {
576                         sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
577                         return -1;
578                 }
579         }
580         return 0;
581 }
582
583 struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
584         void *ctx_userdata,
585         ibw_connstate_fn_t ibw_connstate,
586         ibw_receive_fn_t ibw_receive,
587         struct event_context *ectx,
588         int max_msg_size)
589 {
590         struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
591         struct ibw_ctx_priv *pctx;
592         int     rc;
593
594         /* initialize basic data structures */
595         memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
596
597         assert(ctx!=NULL);
598         ibw_lasterr[0] = '\0';
599         talloc_set_destructor(ctx, ibw_ctx_destruct);
600         ctx->ctx_userdata = ctx_userdata;
601
602         pctx = talloc_zero(ctx, struct ibw_ctx_priv);
603         talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
604         ctx->internal = (void *)pctx;
605         assert(pctx!=NULL);
606
607         pctx->connstate_func = ibw_connstate;
608         pctx->receive_func = ibw_receive;
609
610         pctx->ectx = ectx;
611
612         /* process attributes */
613         if (ibw_process_init_attrs(attr, nattr, &pctx->opts))
614                 goto cleanup;
615
616         /* init cm */
617         pctx->cm_channel = rdma_create_event_channel();
618         if (!pctx->cm_channel) {
619                 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
620                 goto cleanup;
621         }
622
623         pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
624                 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
625
626         rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);
627         if (rc) {
628                 rc = errno;
629                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
630                 goto cleanup;
631         }
632         DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
633
634         /* init verbs */
635         pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
636         if (!pctx->pd) {
637                 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
638                 goto cleanup;
639         }
640         DEBUG(10, ("created pd %p\n", pctx->pd));
641
642         pctx->pagesize = sysconf(_SC_PAGESIZE);
643         pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr;
644         pctx->max_msg_size = max_msg_size;
645
646         return ctx;
647         /* don't put code here */
648 cleanup:
649         DEBUG(0, (ibw_lasterr));
650
651         if (ctx)
652                 talloc_free(ctx);
653
654         return NULL;
655 }
656
657 int ibw_stop(struct ibw_ctx *ctx)
658 {
659         struct ibw_conn *p;
660
661         for(p=ctx->conn_list; p!=NULL; p=p->next) {
662                 if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {
663                         if (ibw_disconnect(p))
664                                 return -1;
665                 }
666         }
667
668         return 0;
669 }
670
671 int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr)
672 {
673         struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;
674         int     rc;
675
676         rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
677         if (rc) {
678                 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
679                 DEBUG(0, (ibw_lasterr));
680                 return rc;
681         }
682         DEBUG(10, ("rdma_bind_addr successful\n"));
683
684         return 0;
685 }
686
687 int ibw_listen(struct ibw_ctx *ctx, int backlog)
688 {
689         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
690         int     rc;
691
692         DEBUG(10, ("rdma_listen...\n"));
693         rc = rdma_listen(pctx->cm_id, backlog);
694         if (rc) {
695                 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
696                 DEBUG(0, (ibw_lasterr));
697                 return rc;
698         }       
699
700         return 0;
701 }
702
703 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
704 {
705         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
706         struct rdma_conn_param  conn_param;
707         int     rc;
708
709         conn->conn_userdata = conn_userdata;
710
711         memset(&conn_param, 0, sizeof(struct rdma_conn_param));
712         conn_param.responder_resources = 1;
713         conn_param.initiator_depth = 1;
714         rc = rdma_accept(pconn->cm_id, &conn_param);
715         if (rc) {
716                 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
717                 DEBUG(0, (ibw_lasterr));
718                 return -1;;
719         }
720
721         pconn->is_accepted = 1;
722
723         /* continued at RDMA_CM_EVENT_ESTABLISHED */
724
725         return 0;
726 }
727
728 int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
729 {
730         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
731         struct ibw_conn *conn = NULL;
732         struct ibw_conn_priv *pconn = NULL;
733         int     rc;
734
735         conn = ibw_conn_new(ctx);
736         conn->conn_userdata = conn_userdata;
737         pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
738
739         rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
740         if (rc) {
741                 rc = errno;
742                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
743                 return rc;
744         }
745
746         rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
747         if (rc) {
748                 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
749                 DEBUG(0, (ibw_lasterr));
750                 return -1;
751         }
752
753         /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
754
755         return 0;
756 }
757
758 int ibw_disconnect(struct ibw_conn *conn)
759 {
760         int     rc;
761         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
762
763         rc = rdma_disconnect(pctx->cm_id);
764         if (rc) {
765                 sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
766                 DEBUG(0, (ibw_lasterr));
767                 return rc;
768         }
769
770         /* continued at RDMA_CM_EVENT_DISCONNECTED */
771
772         return 0;
773 }
774
775 int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
776 {
777         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
778         struct ibw_wr *p = pconn->wr_list_avail;
779
780         if (p==NULL) {
781                 sprintf(ibw_lasterr, "insufficient wr chunks\n");
782                 return -1;
783         }
784
785         DLIST_REMOVE(pconn->wr_list_avail, p);
786         DLIST_ADD(pconn->wr_list_used, p);
787
788         *buf = (void *)p->msg;
789         *key = (void *)p;
790
791         return 0;
792 }
793
794 int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
795 {
796         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
797         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
798         struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
799         struct ibv_sge list = {
800                 .addr   = (uintptr_t) p->msg,
801                 .length = n,
802                 .lkey   = pconn->mr->lkey
803         };
804         struct ibv_send_wr wr = {
805                 .wr_id      = p->wr_id,
806                 .sg_list    = &list,
807                 .num_sge    = 1,
808                 .opcode     = IBV_WR_SEND,
809                 .send_flags = IBV_SEND_SIGNALED,
810         };
811         struct ibv_send_wr *bad_wr;
812
813         assert(p->msg==(char *)buf);
814         assert(n<=pctx->max_msg_size);
815
816         return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
817 }
818
819 const char *ibw_getLastError(void)
820 {
821         return ibw_lasterr;
822 }