Modified send logic to allow large messages.
authorPeter Somogyi <psomogyi@gamax.hu>
Mon, 18 Dec 2006 19:52:49 +0000 (20:52 +0100)
committerPeter Somogyi <psomogyi@gamax.hu>
Mon, 18 Dec 2006 19:52:49 +0000 (20:52 +0100)
TODO: receiver side.

ib/ibwrapper.c
ib/ibwrapper.h
ib/ibwrapper_internal.h
tests/ibwrapper_test.c

index b70b6caad6b8727d214fd99bd0de3c927083bbec..c04505bc474254effef32d5c3708e9afae885c80 100644 (file)
@@ -50,6 +50,37 @@ static void ibw_event_handler_verbs(struct event_context *ev,
        struct fd_event *fde, uint16_t flags, void *private_data);
 static int ibw_fill_cq(struct ibw_conn *conn);
 
+static void *ibw_alloc_mr(struct ibw_ctx_priv *pctx, struct ibw_conn_priv *pconn,
+       int n, struct ibv_mr **ppmr)
+{
+       void *buf;
+       buf = memalign(pctx->pagesize, n);
+       if (!buf) {
+               sprintf(ibw_lasterr, "couldn't allocate memory\n");
+               return NULL;
+       }
+
+       *ppmr = ibv_reg_mr(pctx->pd, buf, n, IBV_ACCESS_LOCAL_WRITE);
+       if (!*ppmr) {
+               sprintf(ibw_lasterr, "couldn't allocate mr\n");
+               free(buf);
+               return NULL;
+       }
+
+       return buf;
+}
+
+static void ibw_free_mr(char **ppbuf, struct ibv_mr **ppmr)
+{
+       if (*ppmr!=NULL) {
+               ibv_dereg_mr(*ppmr);
+               *ppmr = NULL;
+       }
+       if (*ppbuf) {
+               free(*ppbuf);
+               *ppbuf = NULL;
+       }
+}
 
 static int ibw_init_memory(struct ibw_conn *conn)
 {
@@ -59,23 +90,26 @@ static int ibw_init_memory(struct ibw_conn *conn)
        int     i;
        struct ibw_wr   *p;
 
-       pconn->buf = memalign(pctx->pagesize, pctx->max_msg_size);
-       if (!pconn->buf) {
-               sprintf(ibw_lasterr, "couldn't allocate work buf\n");
+       pconn->buf_send = ibw_alloc_mr(pctx, pconn,
+               pctx->opts.max_send_wr * pctx->opts.avg_send_size, &pconn->mr_send);
+       if (!pconn->buf_send) {
+               sprintf(ibw_lasterr, "couldn't allocate work send buf\n");
                return -1;
        }
-       pconn->mr = ibv_reg_mr(pctx->pd, pconn->buf,
-               pctx->qsize * pctx->max_msg_size, IBV_ACCESS_LOCAL_WRITE);
-       if (!pconn->mr) {
-               sprintf(ibw_lasterr, "couldn't allocate mr\n");
+
+       pconn->buf_recv = ibw_alloc_mr(pctx, pconn,
+               pctx->opts.max_recv_wr * pctx->opts.recv_bufsize, &pconn->mr_recv);
+       if (!pconn->buf_recv) {
+               sprintf(ibw_lasterr, "couldn't allocate work recv buf\n");
                return -1;
        }
 
-       pconn->wr_index = talloc_size(pconn, pctx->qsize * sizeof(struct ibw_wr *));
+       pconn->wr_index = talloc_size(pconn, pctx->opts.max_send_wr * sizeof(struct ibw_wr *));
+       assert(pconn->wr_index!=NULL);
 
-       for(i=0; i<pctx->qsize; i++) {
+       for(i=0; i<pctx->opts.max_send_wr; i++) {
                p = pconn->wr_index[i] = talloc_zero(pconn, struct ibw_wr);
-               p->msg = pconn->buf + (i * pctx->max_msg_size);
+               p->msg = pconn->buf_send + (i * pctx->opts.avg_send_size);
                p->wr_id = i;
 
                DLIST_ADD(pconn->wr_list_avail, p);
@@ -117,14 +151,8 @@ static int ibw_ctx_destruct(struct ibw_ctx *ctx)
 static int ibw_conn_priv_destruct(struct ibw_conn_priv *pconn)
 {
        /* free memory regions */
-       if (pconn->mr) {
-               ibv_dereg_mr(pconn->mr);
-               pconn->mr = NULL;
-       }
-       if (pconn->buf) {
-               free(pconn->buf); /* memalign-ed */
-               pconn->buf = NULL;
-       }
+       ibw_free_mr(&pconn->buf_send, &pconn->mr_send);
+       ibw_free_mr(&pconn->buf_recv, &pconn->mr_recv);
 
        /* pconn->wr_index is freed by talloc */
        /* pconn->wr_index[i] are freed by talloc */
@@ -204,7 +232,8 @@ static int ibw_setup_cq_qp(struct ibw_conn *conn)
                pconn->verbs_channel->fd, EVENT_FD_READ, ibw_event_handler_verbs, conn);
 
        /* init cq */
-       pconn->cq = ibv_create_cq(pconn->cm_id->verbs, pctx->qsize,
+       pconn->cq = ibv_create_cq(pconn->cm_id->verbs,
+               pctx->opts.max_recv_wr + pctx->opts.max_send_wr,
                conn, pconn->verbs_channel, 0);
        if (pconn->cq==NULL) {
                sprintf(ibw_lasterr, "ibv_create_cq failed\n");
@@ -244,8 +273,8 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
        int     rc;
        struct ibv_sge list = {
                .addr   = (uintptr_t) NULL,
-               .length = pctx->max_msg_size,
-               .lkey   = pconn->mr->lkey
+               .length = pctx->opts.recv_bufsize,
+               .lkey   = pconn->mr_recv->lkey
        };
        struct ibv_recv_wr wr = {
                .wr_id      = 0,
@@ -253,17 +282,10 @@ static int ibw_refill_cq_recv(struct ibw_conn *conn)
                .num_sge    = 1,
        };
        struct ibv_recv_wr *bad_wr;
-       struct ibw_wr   *p = pconn->wr_list_avail;
 
-       if (p==NULL) {
-               sprintf(ibw_lasterr, "out of wr_list_avail");
-               DEBUG(0, (ibw_lasterr));
-               return -1;
-       }
-       DLIST_REMOVE(pconn->wr_list_avail, p);
-       DLIST_ADD(pconn->wr_list_used, p);
-       list.addr = (uintptr_t) p->msg;
-       wr.wr_id = p->wr_id;
+       list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+       wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+       pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
 
        rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
        if (rc) {
@@ -282,8 +304,8 @@ static int ibw_fill_cq(struct ibw_conn *conn)
        int     i, rc;
        struct ibv_sge list = {
                .addr   = (uintptr_t) NULL,
-               .length = pctx->max_msg_size,
-               .lkey   = pconn->mr->lkey
+               .length = pctx->opts.recv_bufsize,
+               .lkey   = pconn->mr_recv->lkey
        };
        struct ibv_recv_wr wr = {
                .wr_id      = 0,
@@ -291,19 +313,11 @@ static int ibw_fill_cq(struct ibw_conn *conn)
                .num_sge    = 1,
        };
        struct ibv_recv_wr *bad_wr;
-       struct ibw_wr   *p;
 
        for(i = pctx->opts.max_recv_wr; i!=0; i--) {
-               p = pconn->wr_list_avail;
-               if (p==NULL) {
-                       sprintf(ibw_lasterr, "out of wr_list_avail");
-                       DEBUG(0, (ibw_lasterr));
-                       return -1;
-               }
-               DLIST_REMOVE(pconn->wr_list_avail, p);
-               DLIST_ADD(pconn->wr_list_used, p);
-               list.addr = (uintptr_t) p->msg;
-               wr.wr_id = p->wr_id;
+               list.addr = (uintptr_t) pconn->buf_recv + pctx->opts.recv_bufsize * pconn->recv_index;
+               wr.wr_id = pctx->opts.max_send_wr + pconn->recv_index;
+               pconn->recv_index = (pconn->recv_index + 1) % pctx->opts.max_recv_wr;
 
                rc = ibv_post_recv(pconn->cm_id->qp, &wr, &bad_wr);
                if (rc) {
@@ -508,8 +522,13 @@ static void ibw_event_handler_verbs(struct event_context *ev,
        
                        DEBUG(10, ("send completion\n"));
                        assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-                       assert(wc.wr_id < pctx->qsize);
+                       assert(wc.wr_id < pctx->opts.max_send_wr);
+
                        p = pconn->wr_index[wc.wr_id];
+                       if (p->msg_large) {
+                               ibw_free_mr(&p->msg_large, &p->mr_large);
+                       }
+
                        DLIST_REMOVE(pconn->wr_list_used, p);
                        DLIST_ADD(pconn->wr_list_avail, p);
                }
@@ -525,19 +544,19 @@ static void ibw_event_handler_verbs(struct event_context *ev,
 
        case IBV_WC_RECV:
                {
-                       struct ibw_wr   *p;
-       
-                       assert(pconn->cm_id->qp->qp_num==wc.qp_num);
-                       assert(wc.wr_id < pctx->qsize);
-                       p = pconn->wr_index[wc.wr_id];
-       
-                       DLIST_REMOVE(pconn->wr_list_used, p);
-                       DLIST_ADD(pconn->wr_list_avail, p);
-       
+                       int     recv_index;
+
                        DEBUG(10, ("recv completion\n"));
-                       assert(wc.byte_len <= pctx->max_msg_size);
-       
-                       pctx->receive_func(conn, p->msg, wc.byte_len);
+                       assert(pconn->cm_id->qp->qp_num==wc.qp_num);
+                       assert((int)wc.wr_id > pctx->opts.max_send_wr);
+                       recv_index = (int)wc.wr_id - pctx->opts.max_send_wr;
+                       assert(recv_index < pctx->opts.max_recv_wr);
+                       assert(wc.byte_len <= pctx->opts.recv_bufsize);
+
+/* TODO: take care of fragmented messages !!! */
+                       pctx->receive_func(conn,
+                               pconn->buf_recv + (recv_index * pctx->opts.recv_bufsize),
+                               wc.byte_len);
                        if (ibw_refill_cq_recv(conn))
                                goto error;
                }
@@ -562,6 +581,8 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
 
        opts->max_send_wr = 256;
        opts->max_recv_wr = 1024;
+       opts->avg_send_size = 1024;
+       opts->recv_bufsize = 256;
 
        for(i=0; i<nattr; i++) {
                name = attr[i].name;
@@ -572,6 +593,10 @@ static int ibw_process_init_attrs(struct ibw_initattr *attr, int nattr, struct i
                        opts->max_send_wr = atoi(value);
                else if (strcmp(name, "max_recv_wr")==0)
                        opts->max_recv_wr = atoi(value);
+               else if (strcmp(name, "avg_send_size")==0)
+                       opts->avg_send_size = atoi(value);
+               else if (strcmp(name, "recv_bufsize")==0)
+                       opts->recv_bufsize = atoi(value);
                else {
                        sprintf(ibw_lasterr, "ibw_init: unknown name %s\n", name);
                        return -1;
@@ -584,8 +609,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
        void *ctx_userdata,
        ibw_connstate_fn_t ibw_connstate,
        ibw_receive_fn_t ibw_receive,
-       struct event_context *ectx,
-       int max_msg_size)
+       struct event_context *ectx)
 {
        struct ibw_ctx *ctx = talloc_zero(NULL, struct ibw_ctx);
        struct ibw_ctx_priv *pctx;
@@ -640,8 +664,6 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
        DEBUG(10, ("created pd %p\n", pctx->pd));
 
        pctx->pagesize = sysconf(_SC_PAGESIZE);
-       pctx->qsize = pctx->opts.max_send_wr + pctx->opts.max_recv_wr;
-       pctx->max_msg_size = max_msg_size;
 
        return ctx;
        /* don't put code here */
@@ -772,8 +794,9 @@ int ibw_disconnect(struct ibw_conn *conn)
        return 0;
 }
 
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n)
 {
+       struct ibw_ctx_priv *pctx = talloc_get_type(conn->ctx->internal, struct ibw_ctx_priv);
        struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
        struct ibw_wr *p = pconn->wr_list_avail;
 
@@ -785,8 +808,18 @@ int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key)
        DLIST_REMOVE(pconn->wr_list_avail, p);
        DLIST_ADD(pconn->wr_list_used, p);
 
-       *buf = (void *)p->msg;
-       *key = (void *)p;
+       if (n + sizeof(long) <= pctx->opts.avg_send_size) {
+               *buf = (void *)(p->msg + sizeof(long));
+               *key = (void *)p;
+       } else {
+               p->msg_large = ibw_alloc_mr(pctx, pconn, n + sizeof(long), &p->mr_large);
+               if (!p->msg_large) {
+                       sprintf(ibw_lasterr, "ibw_alloc_send_buf alloc error\n");
+                       DEBUG(0, (ibw_lasterr));
+                       return -1;
+               }
+               *buf = (void *)(p->msg_large + sizeof(long));
+       }
 
        return 0;
 }
@@ -797,9 +830,9 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
        struct ibw_conn_priv *pconn = talloc_get_type(conn->internal, struct ibw_conn_priv);
        struct ibw_wr *p = talloc_get_type(key, struct ibw_wr);
        struct ibv_sge list = {
-               .addr   = (uintptr_t) p->msg,
+               .addr   = (uintptr_t) NULL,
                .length = n,
-               .lkey   = pconn->mr->lkey
+               .lkey   = 0
        };
        struct ibv_send_wr wr = {
                .wr_id      = p->wr_id,
@@ -810,8 +843,20 @@ int ibw_send(struct ibw_conn *conn, void *buf, void *key, int n)
        };
        struct ibv_send_wr *bad_wr;
 
-       assert(p->msg==(char *)buf);
-       assert(n<=pctx->max_msg_size);
+       if (n + sizeof(long)<=pctx->opts.avg_send_size) {
+               assert((p->msg + sizeof(long))==(char *)buf);
+               list.lkey = pconn->mr_send->lkey;
+               list.addr = (uintptr_t) p->msg;
+
+               *((uint32_t *)p->msg) = htonl(n);
+       } else {
+               assert((p->msg_large + sizeof(long))==(char *)buf);
+               assert(p->mr_large!=NULL);
+               list.lkey = p->mr_large->lkey;
+               list.addr = (uintptr_t) p->msg_large;
+
+               *((uint32_t *)p->msg_large) = htonl(n);
+       }
 
        return ibv_post_send(pconn->cm_id->qp, &wr, &bad_wr);
 }
index 31f7a4f170b05977f0435ac714f87a6eb9518bd6..8934e68d9fb3ad5096d3e93e2fbdb1a68b623a25 100644 (file)
@@ -107,8 +107,7 @@ struct ibw_ctx *ibw_init(struct ibw_initattr *attr, int nattr,
        void *ctx_userdata,
        ibw_connstate_fn_t ibw_connstate,
        ibw_receive_fn_t ibw_receive,
-       struct event_context *ectx,
-       int max_msg_size);
+       struct event_context *ectx);
 
 /*
  * Must be called in states of (IBWS_ERROR, IBWS_READY, IBWS_CONNECT_REQUEST)
@@ -186,7 +185,7 @@ int ibw_disconnect(struct ibw_conn *conn);
  *
  * Returns 0 on success.
  */
-int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key);
+int ibw_alloc_send_buf(struct ibw_conn *conn, void **buf, void **key, int n);
 
 /*
  * Send the message in one
index 04d82f9565de970303196c47828f6e63f7e01606..b819c483d3db82e4ca9964434f2814674ad1dd4d 100644 (file)
 struct ibw_opts {
        int     max_send_wr;
        int     max_recv_wr;
+       int     avg_send_size;
+       int     recv_bufsize;
 };
 
 struct ibw_wr {
        char    *msg; /* initialized in ibw_init_memory once per connection */
        int     wr_id; /* position in wr_index list; also used as wr id */
+
+       char    *msg_large; /* allocated specially for "large" message */
+       struct ibv_mr *mr_large;
+
        struct ibw_wr *next, *prev; /* in wr_list_avail or wr_list_used */
 };
 
@@ -48,8 +54,6 @@ struct ibw_ctx_priv {
        ibw_receive_fn_t receive_func; /* see ibw_init */
 
        long    pagesize; /* sysconf result for memalign */
-       int     qsize; /* opts.max_send_wr + opts.max_recv_wr */
-       int     max_msg_size; /* see ibw_init */
 };
 
 struct ibw_conn_priv {
@@ -60,10 +64,16 @@ struct ibw_conn_priv {
        int     is_accepted;
 
        struct ibv_cq   *cq; /* qp is in cm_id */
-       struct ibv_mr *mr;
-       char *buf; /* fixed size (qsize * opts.max_msg_size) buffer for send/recv */
+
+       char *buf_send; /* max_send_wr * avg_send_size */
+       struct ibv_mr *mr_send;
        struct ibw_wr *wr_list_avail;
        struct ibw_wr *wr_list_used;
        struct ibw_wr **wr_index; /* array[0..(qsize-1)] of (ibw_wr *) */
+
+       /* buf_recv is a ring buffer */
+       char *buf_recv; /* max_recv_wr * avg_recv_size */
+       struct ibv_mr *mr_recv;
+       int recv_index; /* index of the next recv buffer */
 };
 
index eb2a5469ff76a03e3316059a29d8b3cdb8d8a8d9..c00d0a747cff120810ea58ed1b4394bddf5ac79f 100644 (file)
@@ -50,7 +50,6 @@ struct ibwtest_ctx {
        struct sockaddr_in *addrs; /* dynamic array of dest addrs */
        int     naddrs;
 
-       int     max_msg_size;
        unsigned int    nsec; /* nanosleep between messages */
 
        int     cnt;
@@ -91,15 +90,15 @@ int ibwtest_send_id(struct ibw_conn *conn)
        struct ibwtest_ctx *tcx = talloc_get_type(conn->ctx->ctx_userdata, struct ibwtest_ctx);
 
        DEBUG(10, ("test IBWC_CONNECTED\n"));
-       if (ibw_alloc_send_buf(conn, (void **)&buf, &key)) {
+       if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(tcx->id)+2)) {
                DEBUG(0, ("send_id: ibw_alloc_send_buf failed\n"));
                return -1;
        }
-       
+
        buf[0] = (char)TESTOP_SEND_ID;
        strcpy(buf+1, tcx->id);
 
-       if (ibw_send(conn, buf, key, strlen(buf+1))) {
+       if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
                DEBUG(0, ("send_id: ibw_send error\n"));
                return -1;
        }
@@ -111,16 +110,15 @@ int ibwtest_send_test_msg(struct ibwtest_ctx *tcx, struct ibw_conn *conn, const
        char *buf;
        void *key;
 
-       if (ibw_alloc_send_buf(conn, (void **)&buf, &key)) {
+       if (ibw_alloc_send_buf(conn, (void **)&buf, &key, strlen(msg)+2)) {
                fprintf(stderr, "send_test_msg: ibw_alloc_send_buf failed\n");
                return -1;
        }
 
        buf[0] = (char)TESTOP_SEND_DATA;
-       assert(strlen(msg)<tcx->max_msg_size-1);
        strcpy(buf+1, msg);
        
-       if (ibw_send(conn, buf, key, strlen(buf+1))) {
+       if (ibw_send(conn, buf, key, strlen(buf+1)+2)) {
                DEBUG(0, ("send_test_msg: ibw_send error\n"));
                return -1;
        }
@@ -205,7 +203,7 @@ int ibwtest_receive_handler(struct ibw_conn *conn, void *buf, int n)
                char *buf2;
                void *key2;
                /* bounce message */
-               if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2)) {
+               if (ibw_alloc_send_buf(conn, (void **)&buf2, &key2, n)) {
                        fprintf(stderr, "ibw_alloc_send_buf error #2\n");
                        return -1;
                }
@@ -331,11 +329,10 @@ int ibwtest_init_server(struct ibwtest_ctx *tcx)
 void ibwtest_usage(struct ibwtest_ctx *tcx, char *name)
 {
        printf("Usage:\n");
-       printf("\t%s -i <id> -o {name:value} -d {addr:port} -m max_msg_size -t nsec -s\n", name);
+       printf("\t%s -i <id> -o {name:value} -d {addr:port} -t nsec -s\n", name);
        printf("\t-i <id> is a free text, acting as a server id, max 23 chars [mandatory]\n");
        printf("\t-o name1:value1,name2:value2,... is a list of (name, value) pairs\n");
        printf("\t-d addr1:port1,addr2:port2,... is a list of destination ip addresses\n");
-       printf("\t-m max_msg_size maximum message size [default %d]\n", tcx->max_msg_size);
        printf("\t-t nsec delta time between sends in nanosec [default %d]\n", tcx->nsec);
        printf("\t-s server mode (you have to give exactly one -d address:port in this case)\n");
        printf("Press ctrl+C to stop the program.\n");
@@ -350,7 +347,6 @@ int main(int argc, char *argv[])
 
        tcx = talloc_zero(NULL, struct ibwtest_ctx);
        memset(tcx, 0, sizeof(struct ibwtest_ctx));
-       tcx->max_msg_size = 256;
        tcx->nsec = 1000;
 
        /* here is the only case we can't avoid using global... */
@@ -372,9 +368,6 @@ int main(int argc, char *argv[])
                        if (ibwtest_getdests(tcx, op))
                                goto cleanup;
                        break;
-               case 'm':
-                       tcx->max_msg_size = atoi(optarg);
-                       break;
                case 's':
                        tcx->is_server = 1;
                        break;
@@ -396,8 +389,7 @@ int main(int argc, char *argv[])
                tcx,
                ibwtest_connstate_handler,
                ibwtest_receive_handler,
-               ev,
-               tcx->max_msg_size
+               ev
        );
        if (!tcx->ibwctx)
                goto cleanup;