Made receiver handle partial packets.
[metze/samba/wip.git] / ctdb / ib / ibwrapper.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Wrap Infiniband calls.
4  *
5  * Copyright (C) Sven Oehme <oehmes@de.ibm.com> 2006
6  *
7  * Major code contributions by Peter Somogyi <psomogyi@gamax.hu>
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 2 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include <stdlib.h>
25 #include <string.h>
26 #include <stdio.h>
27 #include <errno.h>
28 #include <sys/types.h>
29 #include <netinet/in.h>
30 #include <sys/socket.h>
31 #include <netdb.h>
32 #include <arpa/inet.h>
33 #include <malloc.h>
34 #include <assert.h>
35 #include <unistd.h>
36
37 #include "includes.h"
38 #include "lib/events/events.h"
39 #include "ibwrapper.h"
40
41 #include <rdma/rdma_cma.h>
42
43 #include "ibwrapper_internal.h"
44 #include "lib/util/dlinklist.h"
45
46 #define IBW_LASTERR_BUFSIZE 512
47 static char ibw_lasterr[IBW_LASTERR_BUFSIZE];
48
49 static void ibw_event_handler_verbs(struct event_context *ev,
50         struct fd_event *fde, uint16_t flags, void *private_data);
51 static int ibw_fill_cq(struct ibw_conn *conn);
52 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc);
53 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc);
54
55 static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
56         int n, struct ibv_mr **ppmr)
57 {
58         void *buf;
59         buf = memalign(pctx->pagesize, n);
60         if (!buf) {
61                 sprintf(ibw_lasterr, "couldn't allocate memory\n");
62                 return NULL;
63         }
64
65         *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
66         if (!*ppmr) {
67                 sprintf(ibw_lasterr, "couldn't allocate mr\n");
68                 free(buf);
69                 return NULL;
70         }
71
72         return buf;
73 }
74
75 static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
76 {
77         if (*ppmr!=NULL) {
78                 ibv_dereg_mr(*ppmr);
79                 *ppmr = NULL;
80         }
81         if (*ppbuf) {
82                 free(*ppbuf);
83                 *ppbuf = NULL;
84         }
85 }
86
87 static int ibw_init_memory(struct ibw_conn *conn)
88 {
89         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
90         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
91
92         int     i;
93         struct ibw_wr   *p;
94
95         pconn->buf_send = ibw_alloc_mr(pctx, pconn,
96                 pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
97         if (!pconn->buf_send) {
98                 sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
99                 return -1;
100         }
101
102         pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
103                 pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
104         if (!pconn->buf_recv) {
105                 sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
106                 return -1;
107         }
108
109         pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
110         assert(pconn->wr_index!=NULL);
111
112         for(i=0; i<pctx->opts.max_send_wr; i++) {
113                 p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
114                 p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
115                 p->wr_id = i;
116
117                 DLIST_ADD(pconn->wr_list_avail, p);
118         }
119
120         return 0;
121 }
122
123 static int ibw_ctx_priv_destruct(struct ibw_ctx_priv *pctx)
124 {
125         if (pctx->pd) {
126                 ibv_dealloc_pd(pctx->pd);
127                 pctx->pd = NULL;
128         }
129
130         /* destroy cm */
131         if (pctx->cm_channel) {
132                 rdma_destroy_event_channel(pctx->cm_channel);
133                 pctx->cm_channel = NULL;
134         }
135         if (pctx->cm_channel_event) {
136                 /* TODO: do we have to do this here? */
137                 talloc_free(pctx->cm_channel_event);
138                 pctx->cm_channel_event = NULL;
139         }
140         if (pctx->cm_id) {
141                 rdma_destroy_id(pctx->cm_id);
142                 pctx->cm_id = NULL;
143         }
144
145         return 0;
146 }
147
148 static int ibw_ctx_destruct(struct ibw_ctx *ctx)
149 {
150         return 0;
151 }
152
153 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
154 {
155         /* free memory regions */
156         ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
157         ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
158
159         /* pconn->wr_index is freed by talloc */
160         /* pconn->wr_index[i] are freed by talloc */
161
162         /* destroy verbs */
163         if (pconn->cm_id->qp) {
164                 ibv_destroy_qp(pconn->cm_id->qp);
165                 pconn->cm_id->qp = NULL;
166         }
167         if (pconn->cq) {
168                 ibv_destroy_cq(pconn->cq);
169                 pconn->cq = NULL;
170         }
171         if (pconn->verbs_channel) {
172                 ibv_destroy_comp_channel(pconn->verbs_channel);
173                 pconn->verbs_channel = NULL;
174         }
175         if (pconn->verbs_channel_event) {
176                 /* TODO: do we have to do this here? */
177                 talloc_free(pconn->verbs_channel_event);
178                 pconn->verbs_channel_event = NULL;
179         }
180         if (pconn->cm_id) {
181                 rdma_destroy_id(pconn->cm_id);
182                 pconn->cm_id = NULL;
183         }
184         return 0;
185 }
186
187 static int ibw_conn_destruct(struct ibw_conn *conn)
188 {
189         /* important here: ctx is a talloc _parent_ */
190         DLIST_REMOVE(conn->ctx->conn_list, conn);
191         return 0;
192 }
193
194 static struct ibw_conn *ibw_conn_new(struct ibw_ctx *ctx)
195 {
196         struct ibw_conn *conn;
197         struct ibw_conn_priv *pconn;
198
199         conn = talloc_zero(ctx, struct ibw_conn);
200         assert(conn!=NULL);
201         talloc_set_destructor(conn, ibw_conn_destruct);
202
203         pconn = talloc_zero(ctx, struct ibw_conn_priv);
204         assert(pconn!=NULL);
205         talloc_set_destructor(pconn, ibw_conn_priv_destruct);
206
207         conn->ctx = ctx;
208
209         DLIST_ADD(ctx->conn_list, conn);
210
211         return conn;
212 }
213
214 static int ibw_setup_cq_qp(struct ibw_conn *conn)
215 {
216         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
217         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
218         struct ibv_qp_init_attr init_attr;
219         int rc;
220
221         /* init mr */
222         if (ibw_init_memory(conn))
223                 return -1;
224
225         /* init verbs */
226         pconn->verbs_channel = ibv_create_comp_channel(pconn->cm_id->verbs);
227         if (!pconn->verbs_channel) {
228                 sprintf(ibw_lasterr, "ibv_create_comp_channel failed %d\n", errno);
229                 return -1;
230         }
231         DEBUG(10, ("created channel %p\n", pconn->verbs_channel));
232
233         pconn->verbs_channel_event = event_add_fd(pctx->ectx, conn,
234                 pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
235
236         /* init cq */
237         pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
238                 pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
239                 conn, pconn->verbs_channel, 0);
240         if (pconn->cq==NULL) {
241                 sprintf(ibw_lasterr, "ibv_create_cq failed\n");
242                 return -1;
243         }
244
245         rc = ibv_req_notify_cq(pconn->cq, 0);
246         if (rc) {
247                 sprintf(ibw_lasterr, "ibv_req_notify_cq failed with %d\n", rc);
248                 return rc;
249         }
250
251         /* init qp */
252         memset(&init_attr, 0, sizeof(init_attr));
253         init_attr.cap.max_send_wr = pctx->opts.max_send_wr;
254         init_attr.cap.max_recv_wr = pctx->opts.max_recv_wr;
255         init_attr.cap.max_recv_sge = 1;
256         init_attr.cap.max_send_sge = 1;
257         init_attr.qp_type = IBV_QPT_RC;
258         init_attr.send_cq = pconn->cq;
259         init_attr.recv_cq = pconn->cq;
260
261         rc = rdma_create_qp(pconn->cm_id, pctx->pd, &init_attr);
262         if (rc) {
263                 sprintf(ibw_lasterr, "rdma_create_qp failed with %d\n", rc);
264                 return rc;
265         }
266         /* elase result is in pconn->cm_id->qp */
267
268         return ibw_fill_cq(conn);
269 }
270
271 static int ibw_refill_cq_recv(struct ibw_conn *conn)
272 {
273         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
274         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
275         int     rc;
276         struct ibv_sge list = {
277                 .addr   = (uintptr_t) NULL,
278                 .length = pctx->opts.recv_bufsize,
279                 .lkey   = pconn->mr_recv->lkey
280         };
281         struct ibv_recv_wr wr = {
282                 .wr_id      = 0,
283                 .sg_list    = &list,
284                 .num_sge    = 1,
285         };
286         struct ibv_recv_wr *bad_wr;
287
288         list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
289         wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
290         pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
291
292         rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
293         if (rc) {
294                 sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
295                 DEBUG(0, (ibw_lasterr));
296                 return -2;
297         }
298
299         return 0;
300 }
301
302 static int ibw_fill_cq(struct ibw_conn *conn)
303 {
304         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
305         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
306         int     i, rc;
307         struct ibv_sge list = {
308                 .addr   = (uintptr_t) NULL,
309                 .length = pctx->opts.recv_bufsize,
310                 .lkey   = pconn->mr_recv->lkey
311         };
312         struct ibv_recv_wr wr = {
313                 .wr_id      = 0,
314                 .sg_list    = &list,
315                 .num_sge    = 1,
316         };
317         struct ibv_recv_wr *bad_wr;
318
319         for(i = pctx->opts.max_recv_wr; i!=0; i--) {
320                 list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
321                 wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
322                 pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
323
324                 rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
325                 if (rc) {
326                         sprintf(ibw_lasterr, "ibv_post_recv failed with %d\n", rc);
327                         DEBUG(0, (ibw_lasterr));
328                         return -2;
329                 }
330         }
331
332         return 0;
333 }
334
335 static int ibw_manage_connect(struct ibw_conn *conn, struct rdma_cm_id *cma_id)
336 {
337         struct rdma_conn_param conn_param;
338         int     rc;
339
340         rc = ibw_setup_cq_qp(conn);
341         if (rc)
342                 return -1;
343
344         /* cm connect */
345         memset(&conn_param, 0, sizeof conn_param);
346         conn_param.responder_resources = 1;
347         conn_param.initiator_depth = 1;
348         conn_param.retry_count = 10;
349
350         rc = rdma_connect(cma_id, &conn_param);
351         if (rc)
352                 sprintf(ibw_lasterr, "rdma_connect error %d\n", rc);
353
354         return rc;
355 }
356
357 static void ibw_event_handler_cm(struct event_context *ev,
358         struct fd_event *fde, uint16_t flags, void *private_data)
359 {
360         int     rc;
361         struct ibw_ctx  *ctx = talloc_get_type(private_data, struct ibw_ctx);
362         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
363         struct ibw_conn *conn = NULL;
364         struct ibw_conn_priv *pconn = NULL;
365         struct rdma_cm_id *cma_id = NULL;
366         struct rdma_cm_event *event = NULL;
367
368         assert(ctx!=NULL);
369
370         rc = rdma_get_cm_event(pctx->cm_channel, &event);
371         if (rc) {
372                 ctx->state = IBWS_ERROR;
373                 sprintf(ibw_lasterr, "rdma_get_cm_event error %d\n", rc);
374                 goto error;
375         }
376         cma_id = event->id;
377
378         DEBUG(10, ("cma_event type %d cma_id %p (%s)\n", event->event, cma_id,
379                   (cma_id == pctx->cm_id) ? "parent" : "child"));
380
381         switch (event->event) {
382         case RDMA_CM_EVENT_ADDR_RESOLVED:
383                 /* continuing from ibw_connect ... */
384                 rc = rdma_resolve_route(cma_id, 2000);
385                 if (rc) {
386                         sprintf(ibw_lasterr, "rdma_resolve_route error %d\n", rc);
387                         goto error;
388                 }
389                 /* continued at RDMA_CM_EVENT_ROUTE_RESOLVED */
390                 break;
391
392         case RDMA_CM_EVENT_ROUTE_RESOLVED:
393                 /* after RDMA_CM_EVENT_ADDR_RESOLVED: */
394                 assert(cma_id->context!=NULL);
395                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
396
397                 rc = ibw_manage_connect(conn, cma_id);
398                 if (rc)
399                         goto error;
400
401                 break;
402
403         case RDMA_CM_EVENT_CONNECT_REQUEST:
404                 ctx->state = IBWS_CONNECT_REQUEST;
405                 conn = ibw_conn_new(ctx);
406                 pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
407                 pconn->cm_id = cma_id; /* !!! event will be freed but id not */
408                 cma_id->context = (void *)conn;
409                 DEBUG(10, ("pconn->cm_id %p\n", pconn->cm_id));
410
411                 conn->state = IBWC_INIT;
412                 pctx->connstate_func(ctx, conn);
413
414                 /* continued at ibw_accept when invoked by the func above */
415                 if (!pconn->is_accepted) {
416                         talloc_free(conn);
417                         DEBUG(10, ("pconn->cm_id %p wasn't accepted\n", pconn->cm_id));
418                 } else {
419                         if (ibw_setup_cq_qp(conn))
420                                 goto error;
421                 }
422
423                 /* TODO: clarify whether if it's needed by upper layer: */
424                 ctx->state = IBWS_READY;
425                 pctx->connstate_func(ctx, NULL);
426
427                 /* NOTE: more requests can arrive until RDMA_CM_EVENT_ESTABLISHED ! */
428                 break;
429
430         case RDMA_CM_EVENT_ESTABLISHED:
431                 /* expected after ibw_accept and ibw_connect[not directly] */
432                 DEBUG(0, ("ESTABLISHED (conn: %u)\n", (unsigned int)cma_id->context));
433                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
434                 assert(conn!=NULL); /* important assumption */
435
436                 /* client conn is up */
437                 conn->state = IBWC_CONNECTED;
438
439                 /* both ctx and conn have changed */
440                 pctx->connstate_func(ctx, conn);
441                 break;
442
443         case RDMA_CM_EVENT_ADDR_ERROR:
444         case RDMA_CM_EVENT_ROUTE_ERROR:
445         case RDMA_CM_EVENT_CONNECT_ERROR:
446         case RDMA_CM_EVENT_UNREACHABLE:
447         case RDMA_CM_EVENT_REJECTED:
448                 sprintf(ibw_lasterr, "cma event %d, error %d\n", event->event, event->status);
449                 goto error;
450
451         case RDMA_CM_EVENT_DISCONNECTED:
452                 if (cma_id!=pctx->cm_id) {
453                         DEBUG(0, ("client DISCONNECT event\n"));
454                         conn = talloc_get_type(cma_id->context, struct ibw_conn);
455                         conn->state = IBWC_DISCONNECTED;
456                         pctx->connstate_func(NULL, conn);
457
458                         talloc_free(conn);
459
460                         /* if we are the last... */
461                         if (ctx->conn_list==NULL)
462                                 rdma_disconnect(pctx->cm_id);
463                 } else {
464                         DEBUG(0, ("server DISCONNECT event\n"));
465                         ctx->state = IBWS_STOPPED; /* ??? TODO: try it... */
466                         /* talloc_free(ctx) should be called within or after this func */
467                         pctx->connstate_func(ctx, NULL);
468                 }
469                 break;
470
471         case RDMA_CM_EVENT_DEVICE_REMOVAL:
472                 sprintf(ibw_lasterr, "cma detected device removal!\n");
473                 goto error;
474
475         default:
476                 sprintf(ibw_lasterr, "unknown event %d\n", event->event);
477                 goto error;
478         }
479
480         if ((rc=rdma_ack_cm_event(event))) {
481                 sprintf(ibw_lasterr, "rdma_ack_cm_event failed with %d\n", rc);
482                 goto error;
483         }
484
485         return;
486 error:
487         DEBUG(0, ("cm event handler: %s", ibw_lasterr));
488         if (cma_id!=pctx->cm_id) {
489                 conn = talloc_get_type(cma_id->context, struct ibw_conn);
490                 if (conn)
491                         conn->state = IBWC_ERROR;
492                 pctx->connstate_func(NULL, conn);
493         } else {
494                 ctx->state = IBWS_ERROR;
495                 pctx->connstate_func(ctx, NULL);
496         }
497 }
498
499 static void ibw_event_handler_verbs(struct event_context *ev,
500         struct fd_event *fde, uint16_t flags, void *private_data)
501 {
502         struct ibw_conn *conn = talloc_get_type(private_data, struct ibw_conn);
503         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
504         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
505
506         struct ibv_wc wc;
507         int rc;
508         struct ibv_cq *ev_cq;
509         void          *ev_ctx;
510
511         /* TODO: check whether if it's good to have more channels here... */
512         rc = ibv_get_cq_event(pconn->verbs_channel, &ev_cq, &ev_ctx);
513         if (rc) {
514                 sprintf(ibw_lasterr, "Failed to get cq_event with %d\n", rc);
515                 goto error;
516         }
517         if (ev_cq != pconn->cq) {
518                 sprintf(ibw_lasterr, "ev_cq(%u) != pconn->cq(%u)\n",
519                         (unsigned int)ev_cq, (unsigned int)pconn->cq);
520                 goto error;
521         }
522         rc = ibv_req_notify_cq(pconn->cq, 0);
523         if (rc) {
524                 sprintf(ibw_lasterr, "Couldn't request CQ notification (%d)\n", rc);
525                 goto error;
526         }
527
528         while((rc=ibv_poll_cq(pconn->cq, 1, &wc))==1) {
529                 if (wc.status) {
530                         sprintf(ibw_lasterr, "cq completion failed status %d\n",
531                                 wc.status);
532                         goto error;
533                 }
534
535                 switch(wc.opcode) {
536                 case IBV_WC_SEND:
537                         DEBUG(10, ("send completion\n"));
538                         if (ibw_wc_send(conn, &wc))
539                                 goto error;
540                         break;
541
542                 case IBV_WC_RDMA_WRITE:
543                         DEBUG(10, ("rdma write completion\n"));
544                         break;
545         
546                 case IBV_WC_RDMA_READ:
547                         DEBUG(10, ("rdma read completion\n"));
548                         break;
549
550                 case IBV_WC_RECV:
551                         DEBUG(10, ("recv completion\n"));
552                         if (ibw_wc_recv(conn, &wc))
553                                 goto error;
554                         break;
555
556                 default:
557                         sprintf(ibw_lasterr, "unknown completion %d\n", wc.opcode);
558                         goto error;
559                 }
560         }
561         if (rc!=0) {
562                 sprintf(ibw_lasterr, "ibv_poll_cq error %d\n", rc);
563                 goto error;
564         }
565
566         return;
567 error:
568         DEBUG(0, (ibw_lasterr));
569         conn->state = IBWC_ERROR;
570         pctx->connstate_func(NULL, conn);
571 }
572
573 static inline int ibw_wc_send(struct ibw_conn *conn, struct ibv_wc *wc)
574 {
575         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
576         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
577         struct ibw_wr   *p;
578
579         assert(pconn->cm_id->qp->qp_num==wc->qp_num);
580         assert(wc->wr_id < pctx->opts.max_send_wr);
581
582         p = pconn->wr_index[wc->wr_id];
583         if (p->msg_large) {
584                 ibw_free_mr(&p->msg_large, &p->mr_large);
585         }
586
587         DLIST_REMOVE(pconn->wr_list_used, p);
588         DLIST_ADD(pconn->wr_list_avail, p);
589
590         return 0;
591 }
592
593 static inline int ibw_append_to_part(void *memctx, struct ibw_part *part,
594         char **pp, uint32_t add_len, int info)
595 {
596         /* allocate more if necessary - it's an "evergrowing" buffer... */
597         if (part->len + add_len > part->bufsize) {
598                 if (part->buf==NULL) {
599                         assert(part->len==0);
600                         part->buf = talloc_size(memctx, add_len);
601                         if (part->buf==NULL) {
602                                 sprintf(ibw_lasterr, "recv talloc_size error (%u) #%d\n",
603                                         add_len, info);
604                                 return -1;
605                         }
606                         part->bufsize = add_len;
607                 } else {
608                         part->buf = talloc_realloc_size(memctx,
609                                 part->buf, part->len + add_len);
610                         if (part->buf==NULL) {
611                                 sprintf(ibw_lasterr, "recv realloc error (%u + %u) #%d\n",
612                                         part->len, add_len, info);
613                                 return -1;
614                         }
615                 }
616                 part->bufsize = part->len + add_len;
617         }
618
619         /* consume pp */
620         memcpy(part->buf + part->len, *pp, add_len);
621         *pp += add_len;
622         part->len += add_len;
623         part->to_read -= add_len;
624
625         return 0;
626 }
627
628 static inline int ibw_wc_mem_threshold(void *memctx, struct ibw_part *part, uint32_t threshold)
629 {
630         if (part->bufsize > threshold) {
631                 talloc_free(part->buf);
632                 part->buf = talloc_size(memctx, threshold);
633                 if (part->buf==NULL) {
634                         sprintf(ibw_lasterr, "talloc_size failed\n");
635                         return -1;
636                 }
637                 part->bufsize = threshold;
638         }
639         return 0;
640 }
641
642 static inline int ibw_wc_recv(struct ibw_conn *conn, struct ibv_wc *wc)
643 {
644         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
645         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
646         int     recv_index;
647         char    *p;
648         uint32_t        remain;
649         struct ibw_part *part;
650
651         assert(pconn->cm_id->qp->qp_num==wc->qp_num);
652         assert((int)wc->wr_id > pctx->opts.max_send_wr);
653         recv_index = (int)wc->wr_id - pctx->opts.max_send_wr;
654         assert(recv_index < pctx->opts.max_recv_wr);
655         assert(wc->byte_len <= pctx->opts.recv_bufsize);
656
657         p = pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize);
658         part = &pconn->part;
659
660         remain = wc->byte_len;
661         while(remain) {
662                 /* here always true: (part->len!=0 && part->to_read!=0) ||
663                         (part->len==0 && part->to_read==0) */
664                 if (part->len) { /* is there a partial msg to be continued? */
665                         int read_len = (part->to_read<=remain) ? part->to_read : remain;
666                         if (ibw_append_to_part(pconn, part, &p, read_len, 421))
667                                 goto error;
668                         remain -= read_len;
669
670                         if (part->len<=sizeof(uint32_t) && part->to_read==0) {
671                                 assert(part->len==sizeof(uint32_t));
672                                 /* set it again now... */
673                                 part->to_read = *((uint32_t *)(part->buf));
674                                 if (part->to_read<sizeof(uint32_t)) {
675                                         sprintf(ibw_lasterr, "got msglen=%u #2\n", part->to_read);
676                                         goto error;
677                                 }
678                                 part->to_read -= sizeof(uint32_t); /* it's already read */
679                         }
680
681                         if (part->to_read==0) {
682                                 pctx->receive_func(conn, part->buf, part->len);
683                                 part->len = 0; /* tells not having partial data (any more) */
684                                 if (ibw_wc_mem_threshold(pconn, part, pctx->opts.recv_threshold))
685                                         goto error;
686                         }
687                 } else {
688                         if (remain>=sizeof(uint32_t)) {
689                                 uint32_t msglen = *(uint32_t *)p;
690                                 if (msglen<sizeof(uint32_t)) {
691                                         sprintf(ibw_lasterr, "got msglen=%u\n", msglen);
692                                         goto error;
693                                 }
694
695                                 /* mostly awaited case: */
696                                 if (msglen<=remain) {
697                                         pctx->receive_func(conn, p, msglen);
698                                         p += msglen;
699                                         remain -= msglen;
700                                 } else {
701                                         part->to_read = msglen;
702                                         /* part->len is already 0 */
703                                         if (ibw_append_to_part(pconn, part, &p, remain, 422))
704                                                 goto error;
705                                         remain = 0; /* to be continued ... */
706                                         /* part->to_read > 0 here */
707                                 }
708                         } else { /* edge case: */
709                                 part->to_read = sizeof(uint32_t);
710                                 /* part->len is already 0 */
711                                 if (ibw_append_to_part(pconn, part, &p, remain, 423))
712                                         goto error;
713                                 remain = 0;
714                                 /* part->to_read > 0 here */
715                         }
716                 }
717         } /* <remain> is always decreased at least by 1 */
718
719         if (ibw_refill_cq_recv(conn))
720                 goto error;
721
722         return 0;
723
724 error:
725         DEBUG(0, ("ibw_wc_recv error: %s", ibw_lasterr));
726         conn->state = IBWC_ERROR;
727         return -1;
728 }
729
730 static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct ibw_opts *opts)
731 {
732         int     i;
733         const char *name, *value;
734
735         opts->max_send_wr = 256;
736         opts->max_recv_wr = 1024;
737         opts->avg_send_size = 1024;
738         opts->recv_bufsize = 256;
739         opts->recv_threshold = 1 * 1024 * 1024;
740
741         for(i=0; i<nattr; i++) {
742                 name = attr[i].name;
743                 value = attr[i].value;
744
745                 assert(name!=NULL && value!=NULL);
746                 if (strcmp(name, "max_send_wr")==0)
747                         opts->max_send_wr = atoi(value);
748                 else if (strcmp(name, "max_recv_wr")==0)
749                         opts->max_recv_wr = atoi(value);
750                 else if (strcmp(name, "avg_send_size")==0)
751                         opts->avg_send_size = atoi(value);
752                 else if (strcmp(name, "recv_bufsize")==0)
753                         opts->recv_bufsize = atoi(value);
754                 else if (strcmp(name, "recv_threshold")==0)
755                         opts->recv_threshold = atoi(value);
756                 else {
757                         sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
758                         return -1;
759                 }
760         }
761         return 0;
762 }
763
764 struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
765         void *ctx_userdata,
766         ibw_connstate_fn_t ibw_connstate,
767         ibw_receive_fn_t ibw_receive,
768         struct event_context *ectx)
769 {
770         struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
771         struct ibw_ctx_priv *pctx;
772         int     rc;
773
774         /* initialize basic data structures */
775         memset(ibw_lasterr, 0, IBW_LASTERR_BUFSIZE);
776
777         assert(ctx!=NULL);
778         ibw_lasterr[0] = '\0';
779         talloc_set_destructor(ctx, ibw_ctx_destruct);
780         ctx->ctx_userdata = ctx_userdata;
781
782         pctx = talloc_zero(ctx, struct ibw_ctx_priv);
783         talloc_set_destructor(pctx, ibw_ctx_priv_destruct);
784         ctx->internal = (void *)pctx;
785         assert(pctx!=NULL);
786
787         pctx->connstate_func = ibw_connstate;
788         pctx->receive_func = ibw_receive;
789
790         pctx->ectx = ectx;
791
792         /* process attributes */
793         if (ibw_process_init_attrs(attr, nattr, &pctx->opts))
794                 goto cleanup;
795
796         /* init cm */
797         pctx->cm_channel = rdma_create_event_channel();
798         if (!pctx->cm_channel) {
799                 sprintf(ibw_lasterr, "rdma_create_event_channel error %d\n", errno);
800                 goto cleanup;
801         }
802
803         pctx->cm_channel_event = event_add_fd(pctx->ectx, pctx,
804                 pctx->cm_channel->fd, EVENT_FD_READ, ibw_event_handler_cm, ctx);
805
806         rc = rdma_create_id(pctx->cm_channel, &pctx->cm_id, ctx, RDMA_PS_TCP);
807         if (rc) {
808                 rc = errno;
809                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
810                 goto cleanup;
811         }
812         DEBUG(10, ("created cm_id %p\n", pctx->cm_id));
813
814         /* init verbs */
815         pctx->pd = ibv_alloc_pd(pctx->cm_id->verbs);
816         if (!pctx->pd) {
817                 sprintf(ibw_lasterr, "ibv_alloc_pd failed %d\n", errno);
818                 goto cleanup;
819         }
820         DEBUG(10, ("created pd %p\n", pctx->pd));
821
822         pctx->pagesize = sysconf(_SC_PAGESIZE);
823
824         return ctx;
825         /* don't put code here */
826 cleanup:
827         DEBUG(0, (ibw_lasterr));
828
829         if (ctx)
830                 talloc_free(ctx);
831
832         return NULL;
833 }
834
835 int ibw_stop(struct ibw_ctx *ctx)
836 {
837         struct ibw_conn *p;
838
839         for(p=ctx->conn_list; p!=NULL; p=p->next) {
840                 if (ctx->state==IBWC_ERROR || ctx->state==IBWC_CONNECTED) {
841                         if (ibw_disconnect(p))
842                                 return -1;
843                 }
844         }
845
846         return 0;
847 }
848
849 int ibw_bind(struct ibw_ctx *ctx, struct sockaddr_in *my_addr)
850 {
851         struct ibw_ctx_priv *pctx = (struct ibw_ctx_priv *)ctx->internal;
852         int     rc;
853
854         rc = rdma_bind_addr(pctx->cm_id, (struct sockaddr *) my_addr);
855         if (rc) {
856                 sprintf(ibw_lasterr, "rdma_bind_addr error %d\n", rc);
857                 DEBUG(0, (ibw_lasterr));
858                 return rc;
859         }
860         DEBUG(10, ("rdma_bind_addr successful\n"));
861
862         return 0;
863 }
864
865 int ibw_listen(struct ibw_ctx *ctx, int backlog)
866 {
867         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
868         int     rc;
869
870         DEBUG(10, ("rdma_listen...\n"));
871         rc = rdma_listen(pctx->cm_id, backlog);
872         if (rc) {
873                 sprintf(ibw_lasterr, "rdma_listen failed: %d\n", rc);
874                 DEBUG(0, (ibw_lasterr));
875                 return rc;
876         }       
877
878         return 0;
879 }
880
881 int ibw_accept(struct ibw_ctx *ctx, struct ibw_conn *conn, void *conn_userdata)
882 {
883         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
884         struct rdma_conn_param  conn_param;
885         int     rc;
886
887         conn->conn_userdata = conn_userdata;
888
889         memset(&conn_param, 0, sizeof(struct rdma_conn_param));
890         conn_param.responder_resources = 1;
891         conn_param.initiator_depth = 1;
892         rc = rdma_accept(pconn->cm_id, &conn_param);
893         if (rc) {
894                 sprintf(ibw_lasterr, "rdma_accept failed %d\n", rc);
895                 DEBUG(0, (ibw_lasterr));
896                 return -1;;
897         }
898
899         pconn->is_accepted = 1;
900
901         /* continued at RDMA_CM_EVENT_ESTABLISHED */
902
903         return 0;
904 }
905
906 int ibw_connect(struct ibw_ctx *ctx, struct sockaddr_in *serv_addr, void *conn_userdata)
907 {
908         struct ibw_ctx_priv *pctx = talloc_get_type(ctx->internal, struct ibw_ctx_priv);
909         struct ibw_conn *conn = NULL;
910         struct ibw_conn_priv *pconn = NULL;
911         int     rc;
912
913         conn = ibw_conn_new(ctx);
914         conn->conn_userdata = conn_userdata;
915         pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
916
917         rc = rdma_create_id(pctx->cm_channel, &pconn->cm_id, conn, RDMA_PS_TCP);
918         if (rc) {
919                 rc = errno;
920                 sprintf(ibw_lasterr, "rdma_create_id error %d\n", rc);
921                 return rc;
922         }
923
924         rc = rdma_resolve_addr(pconn->cm_id, NULL, (struct sockaddr *) &serv_addr, 2000);
925         if (rc) {
926                 sprintf(ibw_lasterr, "rdma_resolve_addr error %d\n", rc);
927                 DEBUG(0, (ibw_lasterr));
928                 return -1;
929         }
930
931         /* continued at RDMA_CM_EVENT_ADDR_RESOLVED */
932
933         return 0;
934 }
935
936 int ibw_disconnect(struct ibw_conn *conn)
937 {
938         int     rc;
939         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
940
941         rc = rdma_disconnect(pctx->cm_id);
942         if (rc) {
943                 sprintf(ibw_lasterr, "ibw_disconnect failed with %d", rc);
944                 DEBUG(0, (ibw_lasterr));
945                 return rc;
946         }
947
948         /* continued at RDMA_CM_EVENT_DISCONNECTED */
949
950         return 0;
951 }
952
953 int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
954 {
955         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
956         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
957         struct ibw_wr *p = pconn->wr_list_avail;
958
959         if (p==NULL) {
960                 sprintf(ibw_lasterr, "insufficient wr chunks\n");
961                 return -1;
962         }
963
964         DLIST_REMOVE(pconn->wr_list_avail, p);
965         DLIST_ADD(pconn->wr_list_used, p);
966
967         if (n + sizeof(long) <= pctx->opts.avg_send_size) {
968                 *buf = (void *)(p->msg + sizeof(long));
969                 *key = (void *)p;
970         } else {
971                 p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
972                 if (!p->msg_large) {
973                         sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
974                         DEBUG(0, (ibw_lasterr));
975                         return -1;
976                 }
977                 *buf = (void *)(p->msg_large + sizeof(long));
978         }
979
980         return 0;
981 }
982
983 int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
984 {
985         struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
986         struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
987         struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
988         struct ibv_sge list = {
989                 .addr   = (uintptr_t) NULL,
990                 .length = n,
991                 .lkey   = 0
992         };
993         struct ibv_send_wr wr = {
994                 .wr_id      = p->wr_id,
995                 .sg_list    = &list,
996                 .num_sge    = 1,
997                 .opcode     = IBV_WR_SEND,
998                 .send_flags = IBV_SEND_SIGNALED,
999         };
1000         struct ibv_send_wr *bad_wr;
1001
1002         if (n + sizeof(uint32_t)<=pctx->opts.avg_send_size) {
1003                 assert((p->msg + sizeof(long))==(char *)buf);
1004                 list.lkey = pconn->mr_send->lkey;
1005                 list.addr = (uintptr_t) p->msg;
1006
1007                 *((uint32_t *)p->msg) = htonl(n);
1008         } else {
1009                 assert((p->msg_large + sizeof(long))==(char *)buf);
1010                 assert(p->mr_large!=NULL);
1011                 list.lkey = p->mr_large->lkey;
1012                 list.addr = (uintptr_t) p->msg_large;
1013
1014                 *((uint32_t *)p->msg_large) = htonl(n);
1015         }
1016
1017         return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
1018 }
1019
1020 const char *ibw_getLastError(void)
1021 {
1022         return ibw_lasterr;
1023 }