Modified send logic to allow large messages.
[metze/samba/wip.git] / ctdb / ib / ibwrapper.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Wrap Infiniband calls.
4  *
5  * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
6  *
7  * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <malloc.h>
34 #include <assert.h>
35 #include <unistd.h>
36
37 #include "includes.h"
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
40
41 #include <rdma/rdma_cma.h>
42
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
45
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
48
49 static void ibw_event_handler_verbs(struct event_context *ev,
50         struct fd_event *fde, uint16_t flags, void *private_data);
51 static int ibw_fill_cq(struct ibw_conn *conn);
52
53 static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
54         int n, struct ibv_mr **ppmr)
55 {
56         void *buf;
57         buf = memalign(pctx->pagesize, n);
58         if (!buf) {
59                 sprintf(ibw_lasterr, "couldn't allocate memory\n");
60                 return NULL;
61         }
62
63         *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
64         if (!*ppmr) {
65                 sprintf(ibw_lasterr, "couldn't allocate mr\n");
66                 free(buf);
67                 return NULL;
68         }
69
70         return buf;
71 }
72
73 static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
74 {
75         if (*ppmr!=NULL) {
76                 ibv_dereg_mr(*ppmr);
77                 *ppmr = NULL;
78         }
79         if (*ppbuf) {
80                 free(*ppbuf);
81                 *ppbuf = NULL;
82         }
83 }
84
85 static int ibw_init_memory(struct ibw_conn *conn)
86 {
87         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
88         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
89
90         int     i;
91         struct ibw_wr   *p;
92
93         pconn->buf_send = ibw_alloc_mr(pctx, pconn,
94                 pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
95         if (!pconn->buf_send) {
96                 sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
97                 return -1;
98         }
99
100         pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
101                 pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
102         if (!pconn->buf_recv) {
103                 sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
104                 return -1;
105         }
106
107         pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
108         assert(pconn->wr_index!=NULL);
109
110         for(i=0; i<pctx->opts.max_send_wr; i++) {
111                 p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
112                 p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
113                 p->wr_id = i;
114
115                 DLIST_ADD(pconn->wr_list_avail, p);
116         }
117
118         return 0;
119 }
120
121 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx)
122 {
123         if (pctx->pd) {
124                 ibv_dealloc_pd(pctx->pd);
125                 pctx->pd = NULL;
126         }
127
128         /* destroy cm */
129         if (pctx->cm_channel) {
130                 rdma_destroy_event_channel(pctx->cm_channel);
131                 pctx->cm_channel = NULL;
132         }
133         if (pctx->cm_channel_event) {
134                 /* TODO: do we have to do this here? */
135                 talloc_free(pctx->cm_channel_event);
136                 pctx->cm_channel_event = NULL;
137         }
138         if (pctx->cm_id) {
139                 rdma_destroy_id(pctx->cm_id);
140                 pctx->cm_id = NULL;
141         }
142
143         return 0;
144 }
145
146 static int ibw_ctx_destruct(struct ibw_ctx *ctx)
147 {
148         return 0;
149 }
150
151 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
152 {
153         /* free memory regions */
154         ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
155         ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
156
157         /* pconn->wr_index is freed by talloc */
158         /* pconn->wr_index[i] are freed by talloc */
159
160         /* destroy verbs */
161         if (pconn->cm_id->qp) {
162                 ibv_destroy_qp(pconn->cm_id->qp);
163                 pconn->cm_id->qp = NULL;
164         }
165         if (pconn->cq) {
166                 ibv_destroy_cq(pconn->cq);
167                 pconn->cq = NULL;
168         }
169         if (pconn->verbs_channel) {
170                 ibv_destroy_comp_channel(pconn->verbs_channel);
171                 pconn->verbs_channel = NULL;
172         }
173         if (pconn->verbs_channel_event) {
174                 /* TODO: do we have to do this here? */
175                 talloc_free(pconn->verbs_channel_event);
176                 pconn->verbs_channel_event = NULL;
177         }
178         if (pconn->cm_id) {
179                 rdma_destroy_id(pconn->cm_id);
180                 pconn->cm_id = NULL;
181         }
182         return 0;
183 }
184
185 static int ibw_conn_destruct(struct ibw_conn *conn)
186 {
187         /* important here: ctx is a talloc _parent_ */
188         DLIST_REMOVE(conn->ctx->conn_list, conn);
189         return 0;
190 }
191
192 static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
193 {
194         struct ibw_conn *conn;
195         struct ibw_conn_priv *pconn;
196
197         conn = talloc_zero(ctx, struct ibw_conn);
198         assert(conn!=NULL);
199         talloc_set_destructor(conn, ibw_conn_destruct);
200
201         pconn = talloc_zero(ctx, struct ibw_conn_priv);
202         assert(pconn!=NULL);
203         talloc_set_destructor(pconn, ibw_conn_priv_destruct);
204
205         conn->ctx = ctx;
206
207         DLIST_ADD(ctx->conn_list, conn);
208
209         return conn;
210 }
211
212 static int ibw_setup_cq_qp(struct ibw_conn *conn)
213 {
214         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
215         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
216         struct ibv_qp_init_attr init_attr;
217         int rc;
218
219         /* init mr */
220         if (ibw_init_memory(conn))
221                 return -1;
222
223         /* init verbs */
224         pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
225         if (!pconn->verbs_channel) {
226                 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
227                 return -1;
228         }
229         DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
230
231         pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
232                 pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
233
234         /* init cq */
235         pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
236                 pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
237                 conn, pconn->verbs_channel, 0);
238         if (pconn->cq==NULL) {
239                 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
240                 return -1;
241         }
242
243         rc = ibv_req_notify_cq(pconn->cq, 0);
244         if (rc) {
245                 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
246                 return rc;
247         }
248
249         /* init qp */
250         memset(&init_attr, 0, sizeof(init_attr));
251         init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
252         init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
253         init_attr.cap.max_recv_sge = 1;
254         init_attr.cap.max_send_sge = 1;
255         init_attr.qp_type = IBV_QPT_RC;
256         init_attr.send_cq = pconn->cq;
257         init_attr.recv_cq = pconn->cq;
258
259         rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
260         if (rc) {
261                 sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
262                 return rc;
263         }
264         /* elase result is in pconn->cm_id->qp */
265
266         return ibw_fill_cq(conn);
267 }
268
269 static int ibw_refill_cq_recv(struct ibw_conn *conn)
270 {
271         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
272         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
273         int     rc;
274         struct ibv_sge list = {
275                 .addr   = (uintptr_t) NULL,
276                 .length = pctx->opts.recv_bufsize,
277                 .lkey   = pconn->mr_recv->lkey
278         };
279         struct ibv_recv_wr wr = {
280                 .wr_id      = 0,
281                 .sg_list    = &list,
282                 .num_sge    = 1,
283         };
284         struct ibv_recv_wr *bad_wr;
285
286         list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
287         wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
288         pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
289
290         rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
291         if (rc) {
292                 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
293                 DEBUG(0, (ibw_lasterr));
294                 return -2;
295         }
296
297         return 0;
298 }
299
300 static int ibw_fill_cq(struct ibw_conn *conn)
301 {
302         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
303         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
304         int     i, rc;
305         struct ibv_sge list = {
306                 .addr   = (uintptr_t) NULL,
307                 .length = pctx->opts.recv_bufsize,
308                 .lkey   = pconn->mr_recv->lkey
309         };
310         struct ibv_recv_wr wr = {
311                 .wr_id      = 0,
312                 .sg_list    = &list,
313                 .num_sge    = 1,
314         };
315         struct ibv_recv_wr *bad_wr;
316
317         for(i = pctx->opts.max_recv_wr; i!=0; i--) {
318                 list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
319                 wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
320                 pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
321
322                 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
323                 if (rc) {
324                         sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
325                         DEBUG(0, (ibw_lasterr));
326                         return -2;
327                 }
328         }
329
330         return 0;
331 }
332
333 static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
334 {
335         struct rdma_conn_param conn_param;
336         int     rc;
337
338         rc = ibw_setup_cq_qp(conn);
339         if (rc)
340                 return -1;
341
342         /* cm connect */
343         memset(&conn_param, 0, sizeof conn_param);
344         conn_param.responder_resources = 1;
345         conn_param.initiator_depth = 1;
346         conn_param.retry_count = 10;
347
348         rc = rdma_connect(cma_id, &conn_param);
349         if (rc)
350                 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
351
352         return rc;
353 }
354
355 static void ibw_event_handler_cm(struct event_context *ev,
356         struct fd_event *fde, uint16_t flags, void *private_data)
357 {
358         int     rc;
359         struct ibw_ctx  *ctx = talloc_get_type(private_data, struct ibw_ctx);
360         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
361         struct ibw_conn *conn = NULL;
362         struct ibw_conn_priv *pconn = NULL;
363         struct rdma_cm_id *cma_id = NULL;
364         struct rdma_cm_event *event = NULL;
365
366         assert(ctx!=NULL);
367
368         rc = rdma_get_cm_event(pctx->cm_channel, &event);
369         if (rc) {
370                 ctx->state = IBWS_ERROR;
371                 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
372                 goto error;
373         }
374         cma_id = event->id;
375
376         DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id,
377                   (cma_id == pctx->cm_id) ? "parent" : "child"));
378
379         switch (event->event) {
380         case RDMA_CM_EVENT_ADDR_RESOLVED:
381                 /* continuing from ibw_connect ... */
382                 rc = rdma_resolve_route(cma_id, 2000);
383                 if (rc) {
384                         sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
385                         goto error;
386                 }
387                 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
388                 break;
389
390         case RDMA_CM_EVENT_ROUTE_RESOLVED:
391                 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
392                 assert(cma_id->context!=NULL);
393                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
394
395                 rc = ibw_manage_connect(conn, cma_id);
396                 if (rc)
397                         goto error;
398
399                 break;
400
401         case RDMA_CM_EVENT_CONNECT_REQUEST:
402                 ctx->state = IBWS_CONNECT_REQUEST;
403                 conn = ibw_conn_new(ctx);
404                 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
405                 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
406                 cma_id->context = (void *)conn;
407                 DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
408
409                 conn->state = IBWC_INIT;
410                 pctx->connstate_func(ctx, conn);
411
412                 /* continued at ibw_accept when invoked by the func above */
413                 if (!pconn->is_accepted) {
414                         talloc_free(conn);
415                         DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
416                 } else {
417                         if (ibw_setup_cq_qp(conn))
418                                 goto error;
419                 }
420
421                 /* TODO: clarify whether if it's needed by upper layer: */
422                 ctx->state = IBWS_READY;
423                 pctx->connstate_func(ctx, NULL);
424
425                 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
426                 break;
427
428         case RDMA_CM_EVENT_ESTABLISHED:
429                 /* expected after ibw_accept and ibw_connect[not directly] */
430                 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id->context));
431                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
432                 assert(conn!=NULL); /* important assumption */
433
434                 /* client conn is up */
435                 conn->state = IBWC_CONNECTED;
436
437                 /* both ctx and conn have changed */
438                 pctx->connstate_func(ctx, conn);
439                 break;
440
441         case RDMA_CM_EVENT_ADDR_ERROR:
442         case RDMA_CM_EVENT_ROUTE_ERROR:
443         case RDMA_CM_EVENT_CONNECT_ERROR:
444         case RDMA_CM_EVENT_UNREACHABLE:
445         case RDMA_CM_EVENT_REJECTED:
446                 sprintf(ibw_lasterr, "cma event %d, error %d\n", event->event, event->status);
447                 goto error;
448
449         case RDMA_CM_EVENT_DISCONNECTED:
450                 if (cma_id!=pctx->cm_id) {
451                         DEBUG(0, ("client DISCONNECT event\n"));
452                         conn = talloc_get_type(cma_id->context, struct ibw_conn);
453                         conn->state = IBWC_DISCONNECTED;
454                         pctx->connstate_func(NULL, conn);
455
456                         talloc_free(conn);
457
458                         /* if we are the last... */
459                         if (ctx->conn_list==NULL)
460                                 rdma_disconnect(pctx->cm_id);
461                 } else {
462                         DEBUG(0, ("server DISCONNECT event\n"));
463                         ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
464                         /* talloc_free(ctx) should be called within or after this func */
465                         pctx->connstate_func(ctx, NULL);
466                 }
467                 break;
468
469         case RDMA_CM_EVENT_DEVICE_REMOVAL:
470                 sprintf(ibw_lasterr, "cma detected device removal!\n");
471                 goto error;
472
473         default:
474                 sprintf(ibw_lasterr, "unknown event %d\n", event->event);
475                 goto error;
476         }
477
478         if ((rc=rdma_ack_cm_event(event))) {
479                 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
480                 goto error;
481         }
482
483         return;
484 error:
485         DEBUG(0, ("cm event handler: %s", ibw_lasterr));
486         if (cma_id!=pctx->cm_id) {
487                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
488                 if (conn)
489                         conn->state = IBWC_ERROR;
490                 pctx->connstate_func(NULL, conn);
491         } else {
492                 ctx->state = IBWS_ERROR;
493                 pctx->connstate_func(ctx, NULL);
494         }
495 }
496
497 static void ibw_event_handler_verbs(struct event_context *ev,
498         struct fd_event *fde, uint16_t flags, void *private_data)
499 {
500         struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn);
501         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
502         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
503
504         struct ibv_wc wc;
505         int rc;
506
507         rc = ibv_poll_cq(pconn->cq, 1, &wc);
508         if (rc!=1) {
509                 sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
510                 goto error;
511         }
512         if (wc.status) {
513                 sprintf(ibw_lasterr, "cq completion failed status %d\n",
514                         wc.status);
515                 goto error;
516         }
517
518         switch(wc.opcode) {
519         case IBV_WC_SEND:
520                 {
521                         struct ibw_wr   *p;
522         
523                         DEBUG(10, ("send completion\n"));
524                         assert(pconn->cm_id->qp->qp_num==wc.qp_num);
525                         assert(wc.wr_id < pctx->opts.max_send_wr);
526
527                         p = pconn->wr_index[wc.wr_id];
528                         if (p->msg_large) {
529                                 ibw_free_mr(&p->msg_large, &p->mr_large);
530                         }
531
532                         DLIST_REMOVE(pconn->wr_list_used, p);
533                         DLIST_ADD(pconn->wr_list_avail, p);
534                 }
535                 break;
536
537         case IBV_WC_RDMA_WRITE:
538                 DEBUG(10, ("rdma write completion\n"));
539                 break;
540
541         case IBV_WC_RDMA_READ:
542                 DEBUG(10, ("rdma read completion\n"));
543                 break;
544
545         case IBV_WC_RECV:
546                 {
547                         int     recv_index;
548
549                         DEBUG(10, ("recv completion\n"));
550                         assert(pconn->cm_id->qp->qp_num==wc.qp_num);
551                         assert((int)wc.wr_id > pctx->opts.max_send_wr);
552                         recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
553                         assert(recv_index < pctx->opts.max_recv_wr);
554                         assert(wc.byte_len <= pctx->opts.recv_bufsize);
555
556 /* TODO: take care of fragmented messages !!! */
557                         pctx->receive_func(conn,
558                                 pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
559                                 wc.byte_len);
560                         if (ibw_refill_cq_recv(conn))
561                                 goto error;
562                 }
563                 break;
564
565         default:
566                 sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
567                 goto error;
568         }
569
570         return;
571 error:
572         DEBUG(0, (ibw_lasterr));
573         conn->state = IBWC_ERROR;
574         pctx->connstate_func(NULL, conn);
575 }
576
577 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
578 {
579         int     i;
580         const char *name, *value;
581
582         opts->max_send_wr = 256;
583         opts->max_recv_wr = 1024;
584         opts->avg_send_size = 1024;
585         opts->recv_bufsize = 256;
586
587         for(i=0; i<nattr; i++) {
588                 name = attr[i].name;
589                 value = attr[i].value;
590
591                 assert(name!=NULL && value!=NULL);
592                 if (strcmp(name, "max_send_wr")==0)
593                         opts->max_send_wr = atoi(value);
594                 else if (strcmp(name, "max_recv_wr")==0)
595                         opts->max_recv_wr = atoi(value);
596                 else if (strcmp(name, "avg_send_size")==0)
597                         opts->avg_send_size = atoi(value);
598                 else if (strcmp(name, "recv_bufsize")==0)
599                         opts->recv_bufsize = atoi(value);
600                 else {
601                         sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
602                         return -1;
603                 }
604         }
605         return 0;
606 }
607
608 struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
609         void *ctx_userdata,
610         ibw_connstate_fn_t ibw_connstate,
611         ibw_receive_fn_t ibw_receive,
612         struct event_context *ectx)
613 {
614         struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
615         struct ibw_ctx_priv *pctx;
616         int     rc;
617
618         /* initialize basic data structures */
619         memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
620
621         assert(ctx!=NULL);
622         ibw_lasterr[0] = '\0';
623         talloc_set_destructor(ctx, ibw_ctx_destruct);
624         ctx->ctx_userdata = ctx_userdata;
625
626         pctx = talloc_zero(ctx, struct ibw_ctx_priv);
627         talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
628         ctx->internal = (void *)pctx;
629         assert(pctx!=NULL);
630
631         pctx->connstate_func = ibw_connstate;
632         pctx->receive_func = ibw_receive;
633
634         pctx->ectx = ectx;
635
636         /* process attributes */
637         if (ibw_process_init_attrs(attr, nattr, &pctx->opts))
638                 goto cleanup;
639
640         /* init cm */
641         pctx->cm_channel = rdma_create_event_channel();
642         if (!pctx->cm_channel) {
643                 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
644                 goto cleanup;
645         }
646
647         pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
648                 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
649
650         rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);
651         if (rc) {
652                 rc = errno;
653                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
654                 goto cleanup;
655         }
656         DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
657
658         /* init verbs */
659         pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
660         if (!pctx->pd) {
661                 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
662                 goto cleanup;
663         }
664         DEBUG(10, ("created pd %p\n", pctx->pd));
665
666         pctx->pagesize = sysconf(_SC_PAGESIZE);
667
668         return ctx;
669         /* don't put code here */
670 cleanup:
671         DEBUG(0, (ibw_lasterr));
672
673         if (ctx)
674                 talloc_free(ctx);
675
676         return NULL;
677 }
678
679 int ibw_stop(struct ibw_ctx *ctx)
680 {
681         struct ibw_conn *p;
682
683         for(p=ctx->conn_list; p!=NULL; p=p->next) {
684                 if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {
685                         if (ibw_disconnect(p))
686                                 return -1;
687                 }
688         }
689
690         return 0;
691 }
692
693 int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr)
694 {
695         struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;
696         int     rc;
697
698         rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
699         if (rc) {
700                 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
701                 DEBUG(0, (ibw_lasterr));
702                 return rc;
703         }
704         DEBUG(10, ("rdma_bind_addr successful\n"));
705
706         return 0;
707 }
708
709 int ibw_listen(struct ibw_ctx *ctx, int backlog)
710 {
711         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
712         int     rc;
713
714         DEBUG(10, ("rdma_listen...\n"));
715         rc = rdma_listen(pctx->cm_id, backlog);
716         if (rc) {
717                 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
718                 DEBUG(0, (ibw_lasterr));
719                 return rc;
720         }       
721
722         return 0;
723 }
724
725 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
726 {
727         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
728         struct rdma_conn_param  conn_param;
729         int     rc;
730
731         conn->conn_userdata = conn_userdata;
732
733         memset(&conn_param, 0, sizeof(struct rdma_conn_param));
734         conn_param.responder_resources = 1;
735         conn_param.initiator_depth = 1;
736         rc = rdma_accept(pconn->cm_id, &conn_param);
737         if (rc) {
738                 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
739                 DEBUG(0, (ibw_lasterr));
740                 return -1;;
741         }
742
743         pconn->is_accepted = 1;
744
745         /* continued at RDMA_CM_EVENT_ESTABLISHED */
746
747         return 0;
748 }
749
750 int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
751 {
752         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
753         struct ibw_conn *conn = NULL;
754         struct ibw_conn_priv *pconn = NULL;
755         int     rc;
756
757         conn = ibw_conn_new(ctx);
758         conn->conn_userdata = conn_userdata;
759         pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
760
761         rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
762         if (rc) {
763                 rc = errno;
764                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
765                 return rc;
766         }
767
768         rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
769         if (rc) {
770                 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
771                 DEBUG(0, (ibw_lasterr));
772                 return -1;
773         }
774
775         /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
776
777         return 0;
778 }
779
780 int ibw_disconnect(struct ibw_conn *conn)
781 {
782         int     rc;
783         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
784
785         rc = rdma_disconnect(pctx->cm_id);
786         if (rc) {
787                 sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
788                 DEBUG(0, (ibw_lasterr));
789                 return rc;
790         }
791
792         /* continued at RDMA_CM_EVENT_DISCONNECTED */
793
794         return 0;
795 }
796
797 int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
798 {
799         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
800         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
801         struct ibw_wr *p = pconn->wr_list_avail;
802
803         if (p==NULL) {
804                 sprintf(ibw_lasterr, "insufficient wr chunks\n");
805                 return -1;
806         }
807
808         DLIST_REMOVE(pconn->wr_list_avail, p);
809         DLIST_ADD(pconn->wr_list_used, p);
810
811         if (n + sizeof(long) <= pctx->opts.avg_send_size) {
812                 *buf = (void *)(p->msg + sizeof(long));
813                 *key = (void *)p;
814         } else {
815                 p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
816                 if (!p->msg_large) {
817                         sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
818                         DEBUG(0, (ibw_lasterr));
819                         return -1;
820                 }
821                 *buf = (void *)(p->msg_large + sizeof(long));
822         }
823
824         return 0;
825 }
826
827 int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
828 {
829         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
830         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
831         struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
832         struct ibv_sge list = {
833                 .addr   = (uintptr_t) NULL,
834                 .length = n,
835                 .lkey   = 0
836         };
837         struct ibv_send_wr wr = {
838                 .wr_id      = p->wr_id,
839                 .sg_list    = &list,
840                 .num_sge    = 1,
841                 .opcode     = IBV_WR_SEND,
842                 .send_flags = IBV_SEND_SIGNALED,
843         };
844         struct ibv_send_wr *bad_wr;
845
846         if (n + sizeof(long)<=pctx->opts.avg_send_size) {
847                 assert((p->msg + sizeof(long))==(char *)buf);
848                 list.lkey = pconn->mr_send->lkey;
849                 list.addr = (uintptr_t) p->msg;
850
851                 *((uint32_t *)p->msg) = htonl(n);
852         } else {
853                 assert((p->msg_large + sizeof(long))==(char *)buf);
854                 assert(p->mr_large!=NULL);
855                 list.lkey = p->mr_large->lkey;
856                 list.addr = (uintptr_t) p->msg_large;
857
858                 *((uint32_t *)p->msg_large) = htonl(n);
859         }
860
861         return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
862 }
863
864 const char *ibw_getLastError(void)
865 {
866         return ibw_lasterr;
867 }