1 // SPDX-License-Identifier: GPL-2.0
3 * Copyright (c) 2016-2018 Oracle. All rights reserved.
5 * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
20 /* Each R/W context contains state for one chain of RDMA Read or
21 * Write Work Requests.
23 * Each WR chain handles a single contiguous server-side buffer,
24 * because scatterlist entries after the first have to start on
25 * page alignment. xdr_buf iovecs cannot guarantee alignment.
27 * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28 * from a client may contain a unique R_key, so each WR chain moves
29 * up to one segment at a time.
31 * The scatterlist makes this data structure over 4KB in size. To
32 * make it less likely to fail, and to handle the allocation for
33 * smaller I/O requests without disabling bottom-halves, these
34 * contexts are created on demand, but cached and reused until the
35 * controlling svcxprt_rdma is destroyed.
37 struct svc_rdma_rw_ctxt {
38 struct llist_node rw_node;
39 struct list_head rw_list;
40 struct rdma_rw_ctx rw_ctx;
41 unsigned int rw_nents;
42 unsigned int rw_first_sgl_nents;
43 struct sg_table rw_sg_table;
44 struct scatterlist rw_first_sgl[];
47 static inline struct svc_rdma_rw_ctxt *
48 svc_rdma_next_ctxt(struct list_head *list)
50 return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
54 static struct svc_rdma_rw_ctxt *
55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
57 struct ib_device *dev = rdma->sc_cm_id->device;
58 unsigned int first_sgl_nents = dev->attrs.max_send_sge;
59 struct svc_rdma_rw_ctxt *ctxt;
60 struct llist_node *node;
62 spin_lock(&rdma->sc_rw_ctxt_lock);
63 node = llist_del_first(&rdma->sc_rw_ctxts);
64 spin_unlock(&rdma->sc_rw_ctxt_lock);
66 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
68 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents),
69 GFP_KERNEL, ibdev_to_node(dev));
73 INIT_LIST_HEAD(&ctxt->rw_list);
74 ctxt->rw_first_sgl_nents = first_sgl_nents;
77 ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
78 if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
79 ctxt->rw_sg_table.sgl,
87 trace_svcrdma_rwctx_empty(rdma, sges);
91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
92 struct llist_head *list)
94 sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents);
95 llist_add(&ctxt->rw_node, list);
98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
99 struct svc_rdma_rw_ctxt *ctxt)
101 __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
105 * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
106 * @rdma: transport about to be destroyed
109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
111 struct svc_rdma_rw_ctxt *ctxt;
112 struct llist_node *node;
114 while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
115 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
121 * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
122 * @rdma: controlling transport instance
123 * @ctxt: R/W context to prepare
124 * @offset: RDMA offset
125 * @handle: RDMA tag/handle
126 * @direction: I/O direction
128 * Returns on success, the number of WQEs that will be needed
129 * on the workqueue, or a negative errno.
131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
132 struct svc_rdma_rw_ctxt *ctxt,
133 u64 offset, u32 handle,
134 enum dma_data_direction direction)
138 ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
139 ctxt->rw_sg_table.sgl, ctxt->rw_nents,
140 0, offset, handle, direction);
141 if (unlikely(ret < 0)) {
142 trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
143 ctxt->rw_nents, ret);
144 svc_rdma_put_rw_ctxt(rdma, ctxt);
150 * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
151 * @rdma: controlling transport instance
152 * @cc: svc_rdma_chunk_ctxt to be initialized
154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
155 struct svc_rdma_chunk_ctxt *cc)
157 struct rpc_rdma_cid *cid = &cc->cc_cid;
159 if (unlikely(!cid->ci_completion_id))
160 svc_rdma_send_cid_init(rdma, cid);
162 INIT_LIST_HEAD(&cc->cc_rwctxts);
167 * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168 * @rdma: controlling transport instance
169 * @cc: svc_rdma_chunk_ctxt to be released
170 * @dir: DMA direction
172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
173 struct svc_rdma_chunk_ctxt *cc,
174 enum dma_data_direction dir)
176 struct llist_node *first, *last;
177 struct svc_rdma_rw_ctxt *ctxt;
180 trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
183 while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
184 list_del(&ctxt->rw_list);
186 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
187 rdma->sc_port_num, ctxt->rw_sg_table.sgl,
188 ctxt->rw_nents, dir);
189 __svc_rdma_put_rw_ctxt(ctxt, &free);
191 ctxt->rw_node.next = first;
192 first = &ctxt->rw_node;
197 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
200 static struct svc_rdma_write_info *
201 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
202 const struct svc_rdma_chunk *chunk)
204 struct svc_rdma_write_info *info;
206 info = kzalloc_node(sizeof(*info), GFP_KERNEL,
207 ibdev_to_node(rdma->sc_cm_id->device));
211 info->wi_rdma = rdma;
212 info->wi_chunk = chunk;
213 svc_rdma_cc_init(rdma, &info->wi_cc);
214 info->wi_cc.cc_cqe.done = svc_rdma_write_done;
218 static void svc_rdma_write_info_free_async(struct work_struct *work)
220 struct svc_rdma_write_info *info;
222 info = container_of(work, struct svc_rdma_write_info, wi_work);
223 svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
229 INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
230 queue_work(svcrdma_wq, &info->wi_work);
234 * svc_rdma_write_chunk_release - Release Write chunk I/O resources
235 * @rdma: controlling transport
236 * @ctxt: Send context that is being released
238 void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
239 struct svc_rdma_send_ctxt *ctxt)
241 struct svc_rdma_write_info *info;
242 struct svc_rdma_chunk_ctxt *cc;
244 while (!list_empty(&ctxt->sc_write_info_list)) {
245 info = list_first_entry(&ctxt->sc_write_info_list,
246 struct svc_rdma_write_info, wi_list);
247 list_del(&info->wi_list);
250 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
251 svc_rdma_write_info_free(info);
256 * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
257 * @rdma: controlling transport
258 * @ctxt: Send context that is being released
260 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
261 struct svc_rdma_send_ctxt *ctxt)
263 struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
265 if (!cc->cc_sqecount)
267 svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
271 * svc_rdma_reply_done - Reply chunk Write completion handler
272 * @cq: controlling Completion Queue
273 * @wc: Work Completion report
275 * Pages under I/O are released by a subsequent Send completion.
277 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
279 struct ib_cqe *cqe = wc->wr_cqe;
280 struct svc_rdma_chunk_ctxt *cc =
281 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
282 struct svcxprt_rdma *rdma = cq->cq_context;
284 switch (wc->status) {
286 trace_svcrdma_wc_reply(&cc->cc_cid);
288 case IB_WC_WR_FLUSH_ERR:
289 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
292 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
295 svc_xprt_deferred_close(&rdma->sc_xprt);
299 * svc_rdma_write_done - Write chunk completion
300 * @cq: controlling Completion Queue
301 * @wc: Work Completion
303 * Pages under I/O are freed by a subsequent Send completion.
305 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
307 struct svcxprt_rdma *rdma = cq->cq_context;
308 struct ib_cqe *cqe = wc->wr_cqe;
309 struct svc_rdma_chunk_ctxt *cc =
310 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
312 switch (wc->status) {
314 trace_svcrdma_wc_write(&cc->cc_cid);
316 case IB_WC_WR_FLUSH_ERR:
317 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
320 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
323 /* The RDMA Write has flushed, so the client won't get
324 * some of the outgoing RPC message. Signal the loss
325 * to the client by closing the connection.
327 svc_xprt_deferred_close(&rdma->sc_xprt);
331 * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
332 * @cq: controlling Completion Queue
333 * @wc: Work Completion
336 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
338 struct svcxprt_rdma *rdma = cq->cq_context;
339 struct ib_cqe *cqe = wc->wr_cqe;
340 struct svc_rdma_chunk_ctxt *cc =
341 container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
342 struct svc_rdma_recv_ctxt *ctxt;
344 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
346 ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
347 switch (wc->status) {
349 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
352 spin_lock(&rdma->sc_rq_dto_lock);
353 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
354 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
355 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
356 spin_unlock(&rdma->sc_rq_dto_lock);
357 svc_xprt_enqueue(&rdma->sc_xprt);
359 case IB_WC_WR_FLUSH_ERR:
360 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
363 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
366 /* The RDMA Read has flushed, so the incoming RPC message
367 * cannot be constructed and must be dropped. Signal the
368 * loss to the client by closing the connection.
370 svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
371 svc_rdma_recv_ctxt_put(rdma, ctxt);
372 svc_xprt_deferred_close(&rdma->sc_xprt);
377 * - If ib_post_send() succeeds, only one completion is expected,
378 * even if one or more WRs are flushed. This is true when posting
379 * an rdma_rw_ctx or when posting a single signaled WR.
381 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
382 struct svc_rdma_chunk_ctxt *cc)
384 struct ib_send_wr *first_wr;
385 const struct ib_send_wr *bad_wr;
386 struct list_head *tmp;
392 if (cc->cc_sqecount > rdma->sc_sq_depth)
397 list_for_each(tmp, &cc->cc_rwctxts) {
398 struct svc_rdma_rw_ctxt *ctxt;
400 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
401 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
402 rdma->sc_port_num, cqe, first_wr);
407 if (atomic_sub_return(cc->cc_sqecount,
408 &rdma->sc_sq_avail) > 0) {
409 cc->cc_posttime = ktime_get();
410 ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
416 percpu_counter_inc(&svcrdma_stat_sq_starve);
417 trace_svcrdma_sq_full(rdma, &cc->cc_cid);
418 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
419 wait_event(rdma->sc_send_wait,
420 atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
421 trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
424 trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
425 svc_xprt_deferred_close(&rdma->sc_xprt);
427 /* If even one was posted, there will be a completion. */
428 if (bad_wr != first_wr)
431 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
432 wake_up(&rdma->sc_send_wait);
436 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
438 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
440 struct svc_rdma_rw_ctxt *ctxt)
442 struct scatterlist *sg = ctxt->rw_sg_table.sgl;
444 sg_set_buf(&sg[0], info->wi_base, len);
445 info->wi_base += len;
450 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
452 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
453 unsigned int remaining,
454 struct svc_rdma_rw_ctxt *ctxt)
456 unsigned int sge_no, sge_bytes, page_off, page_no;
457 const struct xdr_buf *xdr = info->wi_xdr;
458 struct scatterlist *sg;
461 page_off = info->wi_next_off + xdr->page_base;
462 page_no = page_off >> PAGE_SHIFT;
463 page_off = offset_in_page(page_off);
464 page = xdr->pages + page_no;
465 info->wi_next_off += remaining;
466 sg = ctxt->rw_sg_table.sgl;
469 sge_bytes = min_t(unsigned int, remaining,
470 PAGE_SIZE - page_off);
471 sg_set_page(sg, *page, sge_bytes, page_off);
473 remaining -= sge_bytes;
480 ctxt->rw_nents = sge_no;
483 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
487 svc_rdma_build_writes(struct svc_rdma_write_info *info,
488 void (*constructor)(struct svc_rdma_write_info *info,
490 struct svc_rdma_rw_ctxt *ctxt),
491 unsigned int remaining)
493 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
494 struct svcxprt_rdma *rdma = info->wi_rdma;
495 const struct svc_rdma_segment *seg;
496 struct svc_rdma_rw_ctxt *ctxt;
500 unsigned int write_len;
503 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
506 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
507 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
510 ctxt = svc_rdma_get_rw_ctxt(rdma,
511 (write_len >> PAGE_SHIFT) + 2);
515 constructor(info, write_len, ctxt);
516 offset = seg->rs_offset + info->wi_seg_off;
517 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
521 percpu_counter_inc(&svcrdma_stat_write);
523 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
524 cc->cc_sqecount += ret;
525 if (write_len == seg->rs_length - info->wi_seg_off) {
527 info->wi_seg_off = 0;
529 info->wi_seg_off += write_len;
531 remaining -= write_len;
537 trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
538 info->wi_chunk->ch_segcount);
543 * svc_rdma_iov_write - Construct RDMA Writes from an iov
544 * @info: pointer to write arguments
545 * @iov: kvec to write
548 * On success, returns zero
549 * %-E2BIG if the client-provided Write chunk is too small
550 * %-ENOMEM if a resource has been exhausted
551 * %-EIO if an rdma-rw error occurred
553 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
554 const struct kvec *iov)
556 info->wi_base = iov->iov_base;
557 return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
562 * svc_rdma_pages_write - Construct RDMA Writes from pages
563 * @info: pointer to write arguments
564 * @xdr: xdr_buf with pages to write
565 * @offset: offset into the content of @xdr
566 * @length: number of bytes to write
569 * On success, returns zero
570 * %-E2BIG if the client-provided Write chunk is too small
571 * %-ENOMEM if a resource has been exhausted
572 * %-EIO if an rdma-rw error occurred
574 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
575 const struct xdr_buf *xdr,
577 unsigned long length)
580 info->wi_next_off = offset - xdr->head[0].iov_len;
581 return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
586 * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
587 * @xdr: xdr_buf to write
588 * @data: pointer to write arguments
591 * On success, returns zero
592 * %-E2BIG if the client-provided Write chunk is too small
593 * %-ENOMEM if a resource has been exhausted
594 * %-EIO if an rdma-rw error occurred
596 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
598 struct svc_rdma_write_info *info = data;
601 if (xdr->head[0].iov_len) {
602 ret = svc_rdma_iov_write(info, &xdr->head[0]);
608 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
614 if (xdr->tail[0].iov_len) {
615 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
623 /* Link Write WRs for @chunk onto @sctxt's WR chain.
625 static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
626 struct svc_rdma_send_ctxt *sctxt,
627 const struct svc_rdma_chunk *chunk,
628 const struct xdr_buf *xdr)
630 struct svc_rdma_write_info *info;
631 struct svc_rdma_chunk_ctxt *cc;
632 struct ib_send_wr *first_wr;
633 struct xdr_buf payload;
634 struct list_head *pos;
638 if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
639 chunk->ch_payload_length))
642 info = svc_rdma_write_info_alloc(rdma, chunk);
647 ret = svc_rdma_xb_write(&payload, info);
648 if (ret != payload.len)
652 if (unlikely(cc->cc_sqecount > rdma->sc_sq_depth))
655 first_wr = sctxt->sc_wr_chain;
657 list_for_each(pos, &cc->cc_rwctxts) {
658 struct svc_rdma_rw_ctxt *rwc;
660 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
661 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
662 rdma->sc_port_num, cqe, first_wr);
665 sctxt->sc_wr_chain = first_wr;
666 sctxt->sc_sqecount += cc->cc_sqecount;
667 list_add(&info->wi_list, &sctxt->sc_write_info_list);
669 trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
673 svc_rdma_write_info_free(info);
678 * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
679 * @rdma: controlling RDMA transport
680 * @write_pcl: Write list provisioned by the client
681 * @sctxt: Send WR resources
682 * @xdr: xdr_buf containing an RPC Reply message
684 * Returns zero on success, or a negative errno if one or more
685 * Write chunks could not be sent.
687 int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
688 const struct svc_rdma_pcl *write_pcl,
689 struct svc_rdma_send_ctxt *sctxt,
690 const struct xdr_buf *xdr)
692 struct svc_rdma_chunk *chunk;
695 pcl_for_each_chunk(chunk, write_pcl) {
696 if (!chunk->ch_payload_length)
698 ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
706 * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
707 * @rdma: controlling RDMA transport
708 * @write_pcl: Write chunk list provided by client
709 * @reply_pcl: Reply chunk provided by client
710 * @sctxt: Send WR resources
711 * @xdr: xdr_buf containing an RPC Reply
713 * Returns a non-negative number of bytes the chunk consumed, or
714 * %-E2BIG if the payload was larger than the Reply chunk,
715 * %-EINVAL if client provided too many segments,
716 * %-ENOMEM if rdma_rw context pool was exhausted,
717 * %-ENOTCONN if posting failed (connection is lost),
718 * %-EIO if rdma_rw initialization failed (DMA mapping, etc).
720 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
721 const struct svc_rdma_pcl *write_pcl,
722 const struct svc_rdma_pcl *reply_pcl,
723 struct svc_rdma_send_ctxt *sctxt,
724 const struct xdr_buf *xdr)
726 struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
727 struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
728 struct ib_send_wr *first_wr;
729 struct list_head *pos;
733 info->wi_rdma = rdma;
734 info->wi_chunk = pcl_first_chunk(reply_pcl);
735 info->wi_seg_off = 0;
737 info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
739 ret = pcl_process_nonpayloads(write_pcl, xdr,
740 svc_rdma_xb_write, info);
744 first_wr = sctxt->sc_wr_chain;
746 list_for_each(pos, &cc->cc_rwctxts) {
747 struct svc_rdma_rw_ctxt *rwc;
749 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
750 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
751 rdma->sc_port_num, cqe, first_wr);
754 sctxt->sc_wr_chain = first_wr;
755 sctxt->sc_sqecount += cc->cc_sqecount;
757 trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
762 * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
763 * @rqstp: RPC transaction context
764 * @head: context for ongoing I/O
765 * @segment: co-ordinates of remote memory to be read
768 * %0: the Read WR chain was constructed successfully
769 * %-EINVAL: there were not enough rq_pages to finish
770 * %-ENOMEM: allocating a local resources failed
771 * %-EIO: a DMA mapping error occurred
773 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
774 struct svc_rdma_recv_ctxt *head,
775 const struct svc_rdma_segment *segment)
777 struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
778 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
779 unsigned int sge_no, seg_len, len;
780 struct svc_rdma_rw_ctxt *ctxt;
781 struct scatterlist *sg;
784 len = segment->rs_length;
785 sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
786 ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no);
789 ctxt->rw_nents = sge_no;
791 sg = ctxt->rw_sg_table.sgl;
792 for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
793 seg_len = min_t(unsigned int, len,
794 PAGE_SIZE - head->rc_pageoff);
796 if (!head->rc_pageoff)
797 head->rc_page_count++;
799 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage],
800 seg_len, head->rc_pageoff);
803 head->rc_pageoff += seg_len;
804 if (head->rc_pageoff == PAGE_SIZE) {
806 head->rc_pageoff = 0;
810 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages)))
814 ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
815 segment->rs_handle, DMA_FROM_DEVICE);
818 percpu_counter_inc(&svcrdma_stat_read);
820 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
821 cc->cc_sqecount += ret;
825 trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
830 * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
831 * @rqstp: RPC transaction context
832 * @head: context for ongoing I/O
833 * @chunk: Read chunk to pull
836 * %0: the Read WR chain was constructed successfully
837 * %-EINVAL: there were not enough resources to finish
838 * %-ENOMEM: allocating a local resources failed
839 * %-EIO: a DMA mapping error occurred
841 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
842 struct svc_rdma_recv_ctxt *head,
843 const struct svc_rdma_chunk *chunk)
845 const struct svc_rdma_segment *segment;
849 pcl_for_each_segment(segment, chunk) {
850 ret = svc_rdma_build_read_segment(rqstp, head, segment);
853 head->rc_readbytes += segment->rs_length;
859 * svc_rdma_copy_inline_range - Copy part of the inline content into pages
860 * @rqstp: RPC transaction context
861 * @head: context for ongoing I/O
862 * @offset: offset into the Receive buffer of region to copy
863 * @remaining: length of region to copy
865 * Take a page at a time from rqstp->rq_pages and copy the inline
866 * content from the Receive buffer into that page. Update
867 * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
868 * result will land contiguously with the copied content.
871 * %0: Inline content was successfully copied
872 * %-EINVAL: offset or length was incorrect
874 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
875 struct svc_rdma_recv_ctxt *head,
877 unsigned int remaining)
879 unsigned char *dst, *src = head->rc_recv_buf;
880 unsigned int page_no, numpages;
882 numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
883 for (page_no = 0; page_no < numpages; page_no++) {
884 unsigned int page_len;
886 page_len = min_t(unsigned int, remaining,
887 PAGE_SIZE - head->rc_pageoff);
889 if (!head->rc_pageoff)
890 head->rc_page_count++;
892 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
893 memcpy(dst + head->rc_curpage, src + offset, page_len);
895 head->rc_readbytes += page_len;
896 head->rc_pageoff += page_len;
897 if (head->rc_pageoff == PAGE_SIZE) {
899 head->rc_pageoff = 0;
901 remaining -= page_len;
909 * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
910 * @rqstp: RPC transaction context
911 * @head: context for ongoing I/O
913 * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
914 * like an incoming TCP call.
917 * %0: RDMA Read WQEs were successfully built
918 * %-EINVAL: client provided too many chunks or segments,
919 * %-ENOMEM: rdma_rw context pool was exhausted,
920 * %-ENOTCONN: posting failed (connection is lost),
921 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
924 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
925 struct svc_rdma_recv_ctxt *head)
927 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
928 struct svc_rdma_chunk *chunk, *next;
929 unsigned int start, length;
933 chunk = pcl_first_chunk(pcl);
934 length = chunk->ch_position;
935 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
939 pcl_for_each_chunk(chunk, pcl) {
940 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
944 next = pcl_next_chunk(pcl, chunk);
949 length = next->ch_position - head->rc_readbytes;
950 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
956 length = head->rc_byte_len - start;
957 return svc_rdma_copy_inline_range(rqstp, head, start, length);
961 * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
962 * @rqstp: RPC transaction context
963 * @head: context for ongoing I/O
965 * The chunk data lands in the page list of rqstp->rq_arg.pages.
967 * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
968 * Therefore, XDR round-up of the Read chunk and trailing
969 * inline content must both be added at the end of the pagelist.
972 * %0: RDMA Read WQEs were successfully built
973 * %-EINVAL: client provided too many chunks or segments,
974 * %-ENOMEM: rdma_rw context pool was exhausted,
975 * %-ENOTCONN: posting failed (connection is lost),
976 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
978 static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
979 struct svc_rdma_recv_ctxt *head)
981 return svc_rdma_build_read_chunk(rqstp, head,
982 pcl_first_chunk(&head->rc_read_pcl));
986 * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
987 * @rqstp: RPC transaction context
988 * @head: context for ongoing I/O
989 * @chunk: parsed Call chunk to pull
990 * @offset: offset of region to pull
991 * @length: length of region to pull
994 * %0: RDMA Read WQEs were successfully built
995 * %-EINVAL: there were not enough resources to finish
996 * %-ENOMEM: rdma_rw context pool was exhausted,
997 * %-ENOTCONN: posting failed (connection is lost),
998 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1000 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
1001 struct svc_rdma_recv_ctxt *head,
1002 const struct svc_rdma_chunk *chunk,
1003 unsigned int offset, unsigned int length)
1005 const struct svc_rdma_segment *segment;
1009 pcl_for_each_segment(segment, chunk) {
1010 struct svc_rdma_segment dummy;
1012 if (offset > segment->rs_length) {
1013 offset -= segment->rs_length;
1017 dummy.rs_handle = segment->rs_handle;
1018 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
1019 dummy.rs_offset = segment->rs_offset + offset;
1021 ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
1025 head->rc_readbytes += dummy.rs_length;
1026 length -= dummy.rs_length;
1033 * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
1034 * @rqstp: RPC transaction context
1035 * @head: context for ongoing I/O
1038 * %0: RDMA Read WQEs were successfully built
1039 * %-EINVAL: there were not enough resources to finish
1040 * %-ENOMEM: rdma_rw context pool was exhausted,
1041 * %-ENOTCONN: posting failed (connection is lost),
1042 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1044 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1045 struct svc_rdma_recv_ctxt *head)
1047 const struct svc_rdma_chunk *call_chunk =
1048 pcl_first_chunk(&head->rc_call_pcl);
1049 const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1050 struct svc_rdma_chunk *chunk, *next;
1051 unsigned int start, length;
1054 if (pcl_is_empty(pcl))
1055 return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
1058 chunk = pcl_first_chunk(pcl);
1059 length = chunk->ch_position;
1060 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1065 pcl_for_each_chunk(chunk, pcl) {
1066 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1070 next = pcl_next_chunk(pcl, chunk);
1075 length = next->ch_position - head->rc_readbytes;
1076 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1083 length = call_chunk->ch_length - start;
1084 return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1089 * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1090 * @rqstp: RPC transaction context
1091 * @head: context for ongoing I/O
1093 * The start of the data lands in the first page just after the
1094 * Transport header, and the rest lands in rqstp->rq_arg.pages.
1097 * - A PZRC is never sent in an RDMA_MSG message, though it's
1101 * %0: RDMA Read WQEs were successfully built
1102 * %-EINVAL: client provided too many chunks or segments,
1103 * %-ENOMEM: rdma_rw context pool was exhausted,
1104 * %-ENOTCONN: posting failed (connection is lost),
1105 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1107 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1108 struct svc_rdma_recv_ctxt *head)
1110 return svc_rdma_read_call_chunk(rqstp, head);
1113 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1114 * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1115 * returns. This has to be done after all Read WRs are constructed
1116 * to properly handle a page that happens to be part of I/O on behalf
1117 * of two different RDMA segments.
1119 * Note: if the subsequent post_send fails, these pages have already
1120 * been moved to head->rc_pages and thus will be cleaned up by
1121 * svc_rdma_recv_ctxt_put().
1123 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1124 struct svc_rdma_recv_ctxt *head)
1128 for (i = 0; i < head->rc_page_count; i++) {
1129 head->rc_pages[i] = rqstp->rq_pages[i];
1130 rqstp->rq_pages[i] = NULL;
1135 * svc_rdma_process_read_list - Pull list of Read chunks from the client
1136 * @rdma: controlling RDMA transport
1137 * @rqstp: set of pages to use as Read sink buffers
1138 * @head: pages under I/O collect here
1140 * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1141 * pull each Read chunk as they decode an incoming RPC message.
1143 * On Linux, however, the server needs to have a fully-constructed RPC
1144 * message in rqstp->rq_arg when there is a positive return code from
1145 * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1146 * it is received, then here the whole Read list is pulled all at once.
1147 * The ingress RPC message is fully reconstructed once all associated
1148 * RDMA Reads have completed.
1151 * %1: all needed RDMA Reads were posted successfully,
1152 * %-EINVAL: client provided too many chunks or segments,
1153 * %-ENOMEM: rdma_rw context pool was exhausted,
1154 * %-ENOTCONN: posting failed (connection is lost),
1155 * %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1157 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1158 struct svc_rqst *rqstp,
1159 struct svc_rdma_recv_ctxt *head)
1161 struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1164 cc->cc_cqe.done = svc_rdma_wc_read_done;
1165 cc->cc_sqecount = 0;
1166 head->rc_pageoff = 0;
1167 head->rc_curpage = 0;
1168 head->rc_readbytes = 0;
1170 if (pcl_is_empty(&head->rc_call_pcl)) {
1171 if (head->rc_read_pcl.cl_count == 1)
1172 ret = svc_rdma_read_data_item(rqstp, head);
1174 ret = svc_rdma_read_multiple_chunks(rqstp, head);
1176 ret = svc_rdma_read_special(rqstp, head);
1177 svc_rdma_clear_rqst_pages(rqstp, head);
1181 trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1182 ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1183 return ret < 0 ? ret : 1;