f2a100c4c81f12e8ed91d0400938b53e3dd0dc46
[sfrench/cifs-2.6.git] / net / sunrpc / xprtrdma / svc_rdma_rw.c
1 // SPDX-License-Identifier: GPL-2.0
2 /*
3  * Copyright (c) 2016-2018 Oracle.  All rights reserved.
4  *
5  * Use the core R/W API to move RPC-over-RDMA Read and Write chunks.
6  */
7
8 #include <rdma/rw.h>
9
10 #include <linux/sunrpc/xdr.h>
11 #include <linux/sunrpc/rpc_rdma.h>
12 #include <linux/sunrpc/svc_rdma.h>
13
14 #include "xprt_rdma.h"
15 #include <trace/events/rpcrdma.h>
16
17 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc);
18 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc);
19
20 /* Each R/W context contains state for one chain of RDMA Read or
21  * Write Work Requests.
22  *
23  * Each WR chain handles a single contiguous server-side buffer,
24  * because scatterlist entries after the first have to start on
25  * page alignment. xdr_buf iovecs cannot guarantee alignment.
26  *
27  * Each WR chain handles only one R_key. Each RPC-over-RDMA segment
28  * from a client may contain a unique R_key, so each WR chain moves
29  * up to one segment at a time.
30  *
31  * The scatterlist makes this data structure over 4KB in size. To
32  * make it less likely to fail, and to handle the allocation for
33  * smaller I/O requests without disabling bottom-halves, these
34  * contexts are created on demand, but cached and reused until the
35  * controlling svcxprt_rdma is destroyed.
36  */
37 struct svc_rdma_rw_ctxt {
38         struct llist_node       rw_node;
39         struct list_head        rw_list;
40         struct rdma_rw_ctx      rw_ctx;
41         unsigned int            rw_nents;
42         unsigned int            rw_first_sgl_nents;
43         struct sg_table         rw_sg_table;
44         struct scatterlist      rw_first_sgl[];
45 };
46
47 static inline struct svc_rdma_rw_ctxt *
48 svc_rdma_next_ctxt(struct list_head *list)
49 {
50         return list_first_entry_or_null(list, struct svc_rdma_rw_ctxt,
51                                         rw_list);
52 }
53
54 static struct svc_rdma_rw_ctxt *
55 svc_rdma_get_rw_ctxt(struct svcxprt_rdma *rdma, unsigned int sges)
56 {
57         struct ib_device *dev = rdma->sc_cm_id->device;
58         unsigned int first_sgl_nents = dev->attrs.max_send_sge;
59         struct svc_rdma_rw_ctxt *ctxt;
60         struct llist_node *node;
61
62         spin_lock(&rdma->sc_rw_ctxt_lock);
63         node = llist_del_first(&rdma->sc_rw_ctxts);
64         spin_unlock(&rdma->sc_rw_ctxt_lock);
65         if (node) {
66                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
67         } else {
68                 ctxt = kmalloc_node(struct_size(ctxt, rw_first_sgl, first_sgl_nents),
69                                     GFP_KERNEL, ibdev_to_node(dev));
70                 if (!ctxt)
71                         goto out_noctx;
72
73                 INIT_LIST_HEAD(&ctxt->rw_list);
74                 ctxt->rw_first_sgl_nents = first_sgl_nents;
75         }
76
77         ctxt->rw_sg_table.sgl = ctxt->rw_first_sgl;
78         if (sg_alloc_table_chained(&ctxt->rw_sg_table, sges,
79                                    ctxt->rw_sg_table.sgl,
80                                    first_sgl_nents))
81                 goto out_free;
82         return ctxt;
83
84 out_free:
85         kfree(ctxt);
86 out_noctx:
87         trace_svcrdma_rwctx_empty(rdma, sges);
88         return NULL;
89 }
90
91 static void __svc_rdma_put_rw_ctxt(struct svc_rdma_rw_ctxt *ctxt,
92                                    struct llist_head *list)
93 {
94         sg_free_table_chained(&ctxt->rw_sg_table, ctxt->rw_first_sgl_nents);
95         llist_add(&ctxt->rw_node, list);
96 }
97
98 static void svc_rdma_put_rw_ctxt(struct svcxprt_rdma *rdma,
99                                  struct svc_rdma_rw_ctxt *ctxt)
100 {
101         __svc_rdma_put_rw_ctxt(ctxt, &rdma->sc_rw_ctxts);
102 }
103
104 /**
105  * svc_rdma_destroy_rw_ctxts - Free accumulated R/W contexts
106  * @rdma: transport about to be destroyed
107  *
108  */
109 void svc_rdma_destroy_rw_ctxts(struct svcxprt_rdma *rdma)
110 {
111         struct svc_rdma_rw_ctxt *ctxt;
112         struct llist_node *node;
113
114         while ((node = llist_del_first(&rdma->sc_rw_ctxts)) != NULL) {
115                 ctxt = llist_entry(node, struct svc_rdma_rw_ctxt, rw_node);
116                 kfree(ctxt);
117         }
118 }
119
120 /**
121  * svc_rdma_rw_ctx_init - Prepare a R/W context for I/O
122  * @rdma: controlling transport instance
123  * @ctxt: R/W context to prepare
124  * @offset: RDMA offset
125  * @handle: RDMA tag/handle
126  * @direction: I/O direction
127  *
128  * Returns on success, the number of WQEs that will be needed
129  * on the workqueue, or a negative errno.
130  */
131 static int svc_rdma_rw_ctx_init(struct svcxprt_rdma *rdma,
132                                 struct svc_rdma_rw_ctxt *ctxt,
133                                 u64 offset, u32 handle,
134                                 enum dma_data_direction direction)
135 {
136         int ret;
137
138         ret = rdma_rw_ctx_init(&ctxt->rw_ctx, rdma->sc_qp, rdma->sc_port_num,
139                                ctxt->rw_sg_table.sgl, ctxt->rw_nents,
140                                0, offset, handle, direction);
141         if (unlikely(ret < 0)) {
142                 trace_svcrdma_dma_map_rw_err(rdma, offset, handle,
143                                              ctxt->rw_nents, ret);
144                 svc_rdma_put_rw_ctxt(rdma, ctxt);
145         }
146         return ret;
147 }
148
149 /**
150  * svc_rdma_cc_init - Initialize an svc_rdma_chunk_ctxt
151  * @rdma: controlling transport instance
152  * @cc: svc_rdma_chunk_ctxt to be initialized
153  */
154 void svc_rdma_cc_init(struct svcxprt_rdma *rdma,
155                       struct svc_rdma_chunk_ctxt *cc)
156 {
157         struct rpc_rdma_cid *cid = &cc->cc_cid;
158
159         if (unlikely(!cid->ci_completion_id))
160                 svc_rdma_send_cid_init(rdma, cid);
161
162         INIT_LIST_HEAD(&cc->cc_rwctxts);
163         cc->cc_sqecount = 0;
164 }
165
166 /**
167  * svc_rdma_cc_release - Release resources held by a svc_rdma_chunk_ctxt
168  * @rdma: controlling transport instance
169  * @cc: svc_rdma_chunk_ctxt to be released
170  * @dir: DMA direction
171  */
172 void svc_rdma_cc_release(struct svcxprt_rdma *rdma,
173                          struct svc_rdma_chunk_ctxt *cc,
174                          enum dma_data_direction dir)
175 {
176         struct llist_node *first, *last;
177         struct svc_rdma_rw_ctxt *ctxt;
178         LLIST_HEAD(free);
179
180         trace_svcrdma_cc_release(&cc->cc_cid, cc->cc_sqecount);
181
182         first = last = NULL;
183         while ((ctxt = svc_rdma_next_ctxt(&cc->cc_rwctxts)) != NULL) {
184                 list_del(&ctxt->rw_list);
185
186                 rdma_rw_ctx_destroy(&ctxt->rw_ctx, rdma->sc_qp,
187                                     rdma->sc_port_num, ctxt->rw_sg_table.sgl,
188                                     ctxt->rw_nents, dir);
189                 __svc_rdma_put_rw_ctxt(ctxt, &free);
190
191                 ctxt->rw_node.next = first;
192                 first = &ctxt->rw_node;
193                 if (!last)
194                         last = first;
195         }
196         if (first)
197                 llist_add_batch(first, last, &rdma->sc_rw_ctxts);
198 }
199
200 static struct svc_rdma_write_info *
201 svc_rdma_write_info_alloc(struct svcxprt_rdma *rdma,
202                           const struct svc_rdma_chunk *chunk)
203 {
204         struct svc_rdma_write_info *info;
205
206         info = kzalloc_node(sizeof(*info), GFP_KERNEL,
207                             ibdev_to_node(rdma->sc_cm_id->device));
208         if (!info)
209                 return info;
210
211         info->wi_rdma = rdma;
212         info->wi_chunk = chunk;
213         svc_rdma_cc_init(rdma, &info->wi_cc);
214         info->wi_cc.cc_cqe.done = svc_rdma_write_done;
215         return info;
216 }
217
218 static void svc_rdma_write_info_free_async(struct work_struct *work)
219 {
220         struct svc_rdma_write_info *info;
221
222         info = container_of(work, struct svc_rdma_write_info, wi_work);
223         svc_rdma_cc_release(info->wi_rdma, &info->wi_cc, DMA_TO_DEVICE);
224         kfree(info);
225 }
226
227 static void svc_rdma_write_info_free(struct svc_rdma_write_info *info)
228 {
229         INIT_WORK(&info->wi_work, svc_rdma_write_info_free_async);
230         queue_work(svcrdma_wq, &info->wi_work);
231 }
232
233 /**
234  * svc_rdma_write_chunk_release - Release Write chunk I/O resources
235  * @rdma: controlling transport
236  * @ctxt: Send context that is being released
237  */
238 void svc_rdma_write_chunk_release(struct svcxprt_rdma *rdma,
239                                   struct svc_rdma_send_ctxt *ctxt)
240 {
241         struct svc_rdma_write_info *info;
242         struct svc_rdma_chunk_ctxt *cc;
243
244         while (!list_empty(&ctxt->sc_write_info_list)) {
245                 info = list_first_entry(&ctxt->sc_write_info_list,
246                                         struct svc_rdma_write_info, wi_list);
247                 list_del(&info->wi_list);
248
249                 cc = &info->wi_cc;
250                 svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
251                 svc_rdma_write_info_free(info);
252         }
253 }
254
255 /**
256  * svc_rdma_reply_chunk_release - Release Reply chunk I/O resources
257  * @rdma: controlling transport
258  * @ctxt: Send context that is being released
259  */
260 void svc_rdma_reply_chunk_release(struct svcxprt_rdma *rdma,
261                                   struct svc_rdma_send_ctxt *ctxt)
262 {
263         struct svc_rdma_chunk_ctxt *cc = &ctxt->sc_reply_info.wi_cc;
264
265         if (!cc->cc_sqecount)
266                 return;
267         svc_rdma_cc_release(rdma, cc, DMA_TO_DEVICE);
268 }
269
270 /**
271  * svc_rdma_reply_done - Reply chunk Write completion handler
272  * @cq: controlling Completion Queue
273  * @wc: Work Completion report
274  *
275  * Pages under I/O are released by a subsequent Send completion.
276  */
277 static void svc_rdma_reply_done(struct ib_cq *cq, struct ib_wc *wc)
278 {
279         struct ib_cqe *cqe = wc->wr_cqe;
280         struct svc_rdma_chunk_ctxt *cc =
281                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
282         struct svcxprt_rdma *rdma = cq->cq_context;
283
284         switch (wc->status) {
285         case IB_WC_SUCCESS:
286                 trace_svcrdma_wc_reply(&cc->cc_cid);
287                 return;
288         case IB_WC_WR_FLUSH_ERR:
289                 trace_svcrdma_wc_reply_flush(wc, &cc->cc_cid);
290                 break;
291         default:
292                 trace_svcrdma_wc_reply_err(wc, &cc->cc_cid);
293         }
294
295         svc_xprt_deferred_close(&rdma->sc_xprt);
296 }
297
298 /**
299  * svc_rdma_write_done - Write chunk completion
300  * @cq: controlling Completion Queue
301  * @wc: Work Completion
302  *
303  * Pages under I/O are freed by a subsequent Send completion.
304  */
305 static void svc_rdma_write_done(struct ib_cq *cq, struct ib_wc *wc)
306 {
307         struct svcxprt_rdma *rdma = cq->cq_context;
308         struct ib_cqe *cqe = wc->wr_cqe;
309         struct svc_rdma_chunk_ctxt *cc =
310                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
311
312         switch (wc->status) {
313         case IB_WC_SUCCESS:
314                 trace_svcrdma_wc_write(&cc->cc_cid);
315                 return;
316         case IB_WC_WR_FLUSH_ERR:
317                 trace_svcrdma_wc_write_flush(wc, &cc->cc_cid);
318                 break;
319         default:
320                 trace_svcrdma_wc_write_err(wc, &cc->cc_cid);
321         }
322
323         /* The RDMA Write has flushed, so the client won't get
324          * some of the outgoing RPC message. Signal the loss
325          * to the client by closing the connection.
326          */
327         svc_xprt_deferred_close(&rdma->sc_xprt);
328 }
329
330 /**
331  * svc_rdma_wc_read_done - Handle completion of an RDMA Read ctx
332  * @cq: controlling Completion Queue
333  * @wc: Work Completion
334  *
335  */
336 static void svc_rdma_wc_read_done(struct ib_cq *cq, struct ib_wc *wc)
337 {
338         struct svcxprt_rdma *rdma = cq->cq_context;
339         struct ib_cqe *cqe = wc->wr_cqe;
340         struct svc_rdma_chunk_ctxt *cc =
341                         container_of(cqe, struct svc_rdma_chunk_ctxt, cc_cqe);
342         struct svc_rdma_recv_ctxt *ctxt;
343
344         svc_rdma_wake_send_waiters(rdma, cc->cc_sqecount);
345
346         ctxt = container_of(cc, struct svc_rdma_recv_ctxt, rc_cc);
347         switch (wc->status) {
348         case IB_WC_SUCCESS:
349                 trace_svcrdma_wc_read(wc, &cc->cc_cid, ctxt->rc_readbytes,
350                                       cc->cc_posttime);
351
352                 spin_lock(&rdma->sc_rq_dto_lock);
353                 list_add_tail(&ctxt->rc_list, &rdma->sc_read_complete_q);
354                 /* the unlock pairs with the smp_rmb in svc_xprt_ready */
355                 set_bit(XPT_DATA, &rdma->sc_xprt.xpt_flags);
356                 spin_unlock(&rdma->sc_rq_dto_lock);
357                 svc_xprt_enqueue(&rdma->sc_xprt);
358                 return;
359         case IB_WC_WR_FLUSH_ERR:
360                 trace_svcrdma_wc_read_flush(wc, &cc->cc_cid);
361                 break;
362         default:
363                 trace_svcrdma_wc_read_err(wc, &cc->cc_cid);
364         }
365
366         /* The RDMA Read has flushed, so the incoming RPC message
367          * cannot be constructed and must be dropped. Signal the
368          * loss to the client by closing the connection.
369          */
370         svc_rdma_cc_release(rdma, cc, DMA_FROM_DEVICE);
371         svc_rdma_recv_ctxt_put(rdma, ctxt);
372         svc_xprt_deferred_close(&rdma->sc_xprt);
373 }
374
375 /*
376  * Assumptions:
377  * - If ib_post_send() succeeds, only one completion is expected,
378  *   even if one or more WRs are flushed. This is true when posting
379  *   an rdma_rw_ctx or when posting a single signaled WR.
380  */
381 static int svc_rdma_post_chunk_ctxt(struct svcxprt_rdma *rdma,
382                                     struct svc_rdma_chunk_ctxt *cc)
383 {
384         struct ib_send_wr *first_wr;
385         const struct ib_send_wr *bad_wr;
386         struct list_head *tmp;
387         struct ib_cqe *cqe;
388         int ret;
389
390         might_sleep();
391
392         if (cc->cc_sqecount > rdma->sc_sq_depth)
393                 return -EINVAL;
394
395         first_wr = NULL;
396         cqe = &cc->cc_cqe;
397         list_for_each(tmp, &cc->cc_rwctxts) {
398                 struct svc_rdma_rw_ctxt *ctxt;
399
400                 ctxt = list_entry(tmp, struct svc_rdma_rw_ctxt, rw_list);
401                 first_wr = rdma_rw_ctx_wrs(&ctxt->rw_ctx, rdma->sc_qp,
402                                            rdma->sc_port_num, cqe, first_wr);
403                 cqe = NULL;
404         }
405
406         do {
407                 if (atomic_sub_return(cc->cc_sqecount,
408                                       &rdma->sc_sq_avail) > 0) {
409                         cc->cc_posttime = ktime_get();
410                         ret = ib_post_send(rdma->sc_qp, first_wr, &bad_wr);
411                         if (ret)
412                                 break;
413                         return 0;
414                 }
415
416                 percpu_counter_inc(&svcrdma_stat_sq_starve);
417                 trace_svcrdma_sq_full(rdma, &cc->cc_cid);
418                 atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
419                 wait_event(rdma->sc_send_wait,
420                            atomic_read(&rdma->sc_sq_avail) > cc->cc_sqecount);
421                 trace_svcrdma_sq_retry(rdma, &cc->cc_cid);
422         } while (1);
423
424         trace_svcrdma_sq_post_err(rdma, &cc->cc_cid, ret);
425         svc_xprt_deferred_close(&rdma->sc_xprt);
426
427         /* If even one was posted, there will be a completion. */
428         if (bad_wr != first_wr)
429                 return 0;
430
431         atomic_add(cc->cc_sqecount, &rdma->sc_sq_avail);
432         wake_up(&rdma->sc_send_wait);
433         return -ENOTCONN;
434 }
435
436 /* Build and DMA-map an SGL that covers one kvec in an xdr_buf
437  */
438 static void svc_rdma_vec_to_sg(struct svc_rdma_write_info *info,
439                                unsigned int len,
440                                struct svc_rdma_rw_ctxt *ctxt)
441 {
442         struct scatterlist *sg = ctxt->rw_sg_table.sgl;
443
444         sg_set_buf(&sg[0], info->wi_base, len);
445         info->wi_base += len;
446
447         ctxt->rw_nents = 1;
448 }
449
450 /* Build and DMA-map an SGL that covers part of an xdr_buf's pagelist.
451  */
452 static void svc_rdma_pagelist_to_sg(struct svc_rdma_write_info *info,
453                                     unsigned int remaining,
454                                     struct svc_rdma_rw_ctxt *ctxt)
455 {
456         unsigned int sge_no, sge_bytes, page_off, page_no;
457         const struct xdr_buf *xdr = info->wi_xdr;
458         struct scatterlist *sg;
459         struct page **page;
460
461         page_off = info->wi_next_off + xdr->page_base;
462         page_no = page_off >> PAGE_SHIFT;
463         page_off = offset_in_page(page_off);
464         page = xdr->pages + page_no;
465         info->wi_next_off += remaining;
466         sg = ctxt->rw_sg_table.sgl;
467         sge_no = 0;
468         do {
469                 sge_bytes = min_t(unsigned int, remaining,
470                                   PAGE_SIZE - page_off);
471                 sg_set_page(sg, *page, sge_bytes, page_off);
472
473                 remaining -= sge_bytes;
474                 sg = sg_next(sg);
475                 page_off = 0;
476                 sge_no++;
477                 page++;
478         } while (remaining);
479
480         ctxt->rw_nents = sge_no;
481 }
482
483 /* Construct RDMA Write WRs to send a portion of an xdr_buf containing
484  * an RPC Reply.
485  */
486 static int
487 svc_rdma_build_writes(struct svc_rdma_write_info *info,
488                       void (*constructor)(struct svc_rdma_write_info *info,
489                                           unsigned int len,
490                                           struct svc_rdma_rw_ctxt *ctxt),
491                       unsigned int remaining)
492 {
493         struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
494         struct svcxprt_rdma *rdma = info->wi_rdma;
495         const struct svc_rdma_segment *seg;
496         struct svc_rdma_rw_ctxt *ctxt;
497         int ret;
498
499         do {
500                 unsigned int write_len;
501                 u64 offset;
502
503                 if (info->wi_seg_no >= info->wi_chunk->ch_segcount)
504                         goto out_overflow;
505
506                 seg = &info->wi_chunk->ch_segments[info->wi_seg_no];
507                 write_len = min(remaining, seg->rs_length - info->wi_seg_off);
508                 if (!write_len)
509                         goto out_overflow;
510                 ctxt = svc_rdma_get_rw_ctxt(rdma,
511                                             (write_len >> PAGE_SHIFT) + 2);
512                 if (!ctxt)
513                         return -ENOMEM;
514
515                 constructor(info, write_len, ctxt);
516                 offset = seg->rs_offset + info->wi_seg_off;
517                 ret = svc_rdma_rw_ctx_init(rdma, ctxt, offset, seg->rs_handle,
518                                            DMA_TO_DEVICE);
519                 if (ret < 0)
520                         return -EIO;
521                 percpu_counter_inc(&svcrdma_stat_write);
522
523                 list_add(&ctxt->rw_list, &cc->cc_rwctxts);
524                 cc->cc_sqecount += ret;
525                 if (write_len == seg->rs_length - info->wi_seg_off) {
526                         info->wi_seg_no++;
527                         info->wi_seg_off = 0;
528                 } else {
529                         info->wi_seg_off += write_len;
530                 }
531                 remaining -= write_len;
532         } while (remaining);
533
534         return 0;
535
536 out_overflow:
537         trace_svcrdma_small_wrch_err(&cc->cc_cid, remaining, info->wi_seg_no,
538                                      info->wi_chunk->ch_segcount);
539         return -E2BIG;
540 }
541
542 /**
543  * svc_rdma_iov_write - Construct RDMA Writes from an iov
544  * @info: pointer to write arguments
545  * @iov: kvec to write
546  *
547  * Returns:
548  *   On success, returns zero
549  *   %-E2BIG if the client-provided Write chunk is too small
550  *   %-ENOMEM if a resource has been exhausted
551  *   %-EIO if an rdma-rw error occurred
552  */
553 static int svc_rdma_iov_write(struct svc_rdma_write_info *info,
554                               const struct kvec *iov)
555 {
556         info->wi_base = iov->iov_base;
557         return svc_rdma_build_writes(info, svc_rdma_vec_to_sg,
558                                      iov->iov_len);
559 }
560
561 /**
562  * svc_rdma_pages_write - Construct RDMA Writes from pages
563  * @info: pointer to write arguments
564  * @xdr: xdr_buf with pages to write
565  * @offset: offset into the content of @xdr
566  * @length: number of bytes to write
567  *
568  * Returns:
569  *   On success, returns zero
570  *   %-E2BIG if the client-provided Write chunk is too small
571  *   %-ENOMEM if a resource has been exhausted
572  *   %-EIO if an rdma-rw error occurred
573  */
574 static int svc_rdma_pages_write(struct svc_rdma_write_info *info,
575                                 const struct xdr_buf *xdr,
576                                 unsigned int offset,
577                                 unsigned long length)
578 {
579         info->wi_xdr = xdr;
580         info->wi_next_off = offset - xdr->head[0].iov_len;
581         return svc_rdma_build_writes(info, svc_rdma_pagelist_to_sg,
582                                      length);
583 }
584
585 /**
586  * svc_rdma_xb_write - Construct RDMA Writes to write an xdr_buf
587  * @xdr: xdr_buf to write
588  * @data: pointer to write arguments
589  *
590  * Returns:
591  *   On success, returns zero
592  *   %-E2BIG if the client-provided Write chunk is too small
593  *   %-ENOMEM if a resource has been exhausted
594  *   %-EIO if an rdma-rw error occurred
595  */
596 static int svc_rdma_xb_write(const struct xdr_buf *xdr, void *data)
597 {
598         struct svc_rdma_write_info *info = data;
599         int ret;
600
601         if (xdr->head[0].iov_len) {
602                 ret = svc_rdma_iov_write(info, &xdr->head[0]);
603                 if (ret < 0)
604                         return ret;
605         }
606
607         if (xdr->page_len) {
608                 ret = svc_rdma_pages_write(info, xdr, xdr->head[0].iov_len,
609                                            xdr->page_len);
610                 if (ret < 0)
611                         return ret;
612         }
613
614         if (xdr->tail[0].iov_len) {
615                 ret = svc_rdma_iov_write(info, &xdr->tail[0]);
616                 if (ret < 0)
617                         return ret;
618         }
619
620         return xdr->len;
621 }
622
623 /* Link Write WRs for @chunk onto @sctxt's WR chain.
624  */
625 static int svc_rdma_prepare_write_chunk(struct svcxprt_rdma *rdma,
626                                         struct svc_rdma_send_ctxt *sctxt,
627                                         const struct svc_rdma_chunk *chunk,
628                                         const struct xdr_buf *xdr)
629 {
630         struct svc_rdma_write_info *info;
631         struct svc_rdma_chunk_ctxt *cc;
632         struct ib_send_wr *first_wr;
633         struct xdr_buf payload;
634         struct list_head *pos;
635         struct ib_cqe *cqe;
636         int ret;
637
638         if (xdr_buf_subsegment(xdr, &payload, chunk->ch_position,
639                                chunk->ch_payload_length))
640                 return -EMSGSIZE;
641
642         info = svc_rdma_write_info_alloc(rdma, chunk);
643         if (!info)
644                 return -ENOMEM;
645         cc = &info->wi_cc;
646
647         ret = svc_rdma_xb_write(&payload, info);
648         if (ret != payload.len)
649                 goto out_err;
650
651         ret = -EINVAL;
652         if (unlikely(cc->cc_sqecount > rdma->sc_sq_depth))
653                 goto out_err;
654
655         first_wr = sctxt->sc_wr_chain;
656         cqe = &cc->cc_cqe;
657         list_for_each(pos, &cc->cc_rwctxts) {
658                 struct svc_rdma_rw_ctxt *rwc;
659
660                 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
661                 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
662                                            rdma->sc_port_num, cqe, first_wr);
663                 cqe = NULL;
664         }
665         sctxt->sc_wr_chain = first_wr;
666         sctxt->sc_sqecount += cc->cc_sqecount;
667         list_add(&info->wi_list, &sctxt->sc_write_info_list);
668
669         trace_svcrdma_post_write_chunk(&cc->cc_cid, cc->cc_sqecount);
670         return 0;
671
672 out_err:
673         svc_rdma_write_info_free(info);
674         return ret;
675 }
676
677 /**
678  * svc_rdma_prepare_write_list - Construct WR chain for sending Write list
679  * @rdma: controlling RDMA transport
680  * @write_pcl: Write list provisioned by the client
681  * @sctxt: Send WR resources
682  * @xdr: xdr_buf containing an RPC Reply message
683  *
684  * Returns zero on success, or a negative errno if one or more
685  * Write chunks could not be sent.
686  */
687 int svc_rdma_prepare_write_list(struct svcxprt_rdma *rdma,
688                                 const struct svc_rdma_pcl *write_pcl,
689                                 struct svc_rdma_send_ctxt *sctxt,
690                                 const struct xdr_buf *xdr)
691 {
692         struct svc_rdma_chunk *chunk;
693         int ret;
694
695         pcl_for_each_chunk(chunk, write_pcl) {
696                 if (!chunk->ch_payload_length)
697                         break;
698                 ret = svc_rdma_prepare_write_chunk(rdma, sctxt, chunk, xdr);
699                 if (ret < 0)
700                         return ret;
701         }
702         return 0;
703 }
704
705 /**
706  * svc_rdma_prepare_reply_chunk - Construct WR chain for writing the Reply chunk
707  * @rdma: controlling RDMA transport
708  * @write_pcl: Write chunk list provided by client
709  * @reply_pcl: Reply chunk provided by client
710  * @sctxt: Send WR resources
711  * @xdr: xdr_buf containing an RPC Reply
712  *
713  * Returns a non-negative number of bytes the chunk consumed, or
714  *      %-E2BIG if the payload was larger than the Reply chunk,
715  *      %-EINVAL if client provided too many segments,
716  *      %-ENOMEM if rdma_rw context pool was exhausted,
717  *      %-ENOTCONN if posting failed (connection is lost),
718  *      %-EIO if rdma_rw initialization failed (DMA mapping, etc).
719  */
720 int svc_rdma_prepare_reply_chunk(struct svcxprt_rdma *rdma,
721                                  const struct svc_rdma_pcl *write_pcl,
722                                  const struct svc_rdma_pcl *reply_pcl,
723                                  struct svc_rdma_send_ctxt *sctxt,
724                                  const struct xdr_buf *xdr)
725 {
726         struct svc_rdma_write_info *info = &sctxt->sc_reply_info;
727         struct svc_rdma_chunk_ctxt *cc = &info->wi_cc;
728         struct ib_send_wr *first_wr;
729         struct list_head *pos;
730         struct ib_cqe *cqe;
731         int ret;
732
733         info->wi_rdma = rdma;
734         info->wi_chunk = pcl_first_chunk(reply_pcl);
735         info->wi_seg_off = 0;
736         info->wi_seg_no = 0;
737         info->wi_cc.cc_cqe.done = svc_rdma_reply_done;
738
739         ret = pcl_process_nonpayloads(write_pcl, xdr,
740                                       svc_rdma_xb_write, info);
741         if (ret < 0)
742                 return ret;
743
744         first_wr = sctxt->sc_wr_chain;
745         cqe = &cc->cc_cqe;
746         list_for_each(pos, &cc->cc_rwctxts) {
747                 struct svc_rdma_rw_ctxt *rwc;
748
749                 rwc = list_entry(pos, struct svc_rdma_rw_ctxt, rw_list);
750                 first_wr = rdma_rw_ctx_wrs(&rwc->rw_ctx, rdma->sc_qp,
751                                            rdma->sc_port_num, cqe, first_wr);
752                 cqe = NULL;
753         }
754         sctxt->sc_wr_chain = first_wr;
755         sctxt->sc_sqecount += cc->cc_sqecount;
756
757         trace_svcrdma_post_reply_chunk(&cc->cc_cid, cc->cc_sqecount);
758         return xdr->len;
759 }
760
761 /**
762  * svc_rdma_build_read_segment - Build RDMA Read WQEs to pull one RDMA segment
763  * @rqstp: RPC transaction context
764  * @head: context for ongoing I/O
765  * @segment: co-ordinates of remote memory to be read
766  *
767  * Returns:
768  *   %0: the Read WR chain was constructed successfully
769  *   %-EINVAL: there were not enough rq_pages to finish
770  *   %-ENOMEM: allocating a local resources failed
771  *   %-EIO: a DMA mapping error occurred
772  */
773 static int svc_rdma_build_read_segment(struct svc_rqst *rqstp,
774                                        struct svc_rdma_recv_ctxt *head,
775                                        const struct svc_rdma_segment *segment)
776 {
777         struct svcxprt_rdma *rdma = svc_rdma_rqst_rdma(rqstp);
778         struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
779         unsigned int sge_no, seg_len, len;
780         struct svc_rdma_rw_ctxt *ctxt;
781         struct scatterlist *sg;
782         int ret;
783
784         len = segment->rs_length;
785         sge_no = PAGE_ALIGN(head->rc_pageoff + len) >> PAGE_SHIFT;
786         ctxt = svc_rdma_get_rw_ctxt(rdma, sge_no);
787         if (!ctxt)
788                 return -ENOMEM;
789         ctxt->rw_nents = sge_no;
790
791         sg = ctxt->rw_sg_table.sgl;
792         for (sge_no = 0; sge_no < ctxt->rw_nents; sge_no++) {
793                 seg_len = min_t(unsigned int, len,
794                                 PAGE_SIZE - head->rc_pageoff);
795
796                 if (!head->rc_pageoff)
797                         head->rc_page_count++;
798
799                 sg_set_page(sg, rqstp->rq_pages[head->rc_curpage],
800                             seg_len, head->rc_pageoff);
801                 sg = sg_next(sg);
802
803                 head->rc_pageoff += seg_len;
804                 if (head->rc_pageoff == PAGE_SIZE) {
805                         head->rc_curpage++;
806                         head->rc_pageoff = 0;
807                 }
808                 len -= seg_len;
809
810                 if (len && ((head->rc_curpage + 1) > ARRAY_SIZE(rqstp->rq_pages)))
811                         goto out_overrun;
812         }
813
814         ret = svc_rdma_rw_ctx_init(rdma, ctxt, segment->rs_offset,
815                                    segment->rs_handle, DMA_FROM_DEVICE);
816         if (ret < 0)
817                 return -EIO;
818         percpu_counter_inc(&svcrdma_stat_read);
819
820         list_add(&ctxt->rw_list, &cc->cc_rwctxts);
821         cc->cc_sqecount += ret;
822         return 0;
823
824 out_overrun:
825         trace_svcrdma_page_overrun_err(&cc->cc_cid, head->rc_curpage);
826         return -EINVAL;
827 }
828
829 /**
830  * svc_rdma_build_read_chunk - Build RDMA Read WQEs to pull one RDMA chunk
831  * @rqstp: RPC transaction context
832  * @head: context for ongoing I/O
833  * @chunk: Read chunk to pull
834  *
835  * Return values:
836  *   %0: the Read WR chain was constructed successfully
837  *   %-EINVAL: there were not enough resources to finish
838  *   %-ENOMEM: allocating a local resources failed
839  *   %-EIO: a DMA mapping error occurred
840  */
841 static int svc_rdma_build_read_chunk(struct svc_rqst *rqstp,
842                                      struct svc_rdma_recv_ctxt *head,
843                                      const struct svc_rdma_chunk *chunk)
844 {
845         const struct svc_rdma_segment *segment;
846         int ret;
847
848         ret = -EINVAL;
849         pcl_for_each_segment(segment, chunk) {
850                 ret = svc_rdma_build_read_segment(rqstp, head, segment);
851                 if (ret < 0)
852                         break;
853                 head->rc_readbytes += segment->rs_length;
854         }
855         return ret;
856 }
857
858 /**
859  * svc_rdma_copy_inline_range - Copy part of the inline content into pages
860  * @rqstp: RPC transaction context
861  * @head: context for ongoing I/O
862  * @offset: offset into the Receive buffer of region to copy
863  * @remaining: length of region to copy
864  *
865  * Take a page at a time from rqstp->rq_pages and copy the inline
866  * content from the Receive buffer into that page. Update
867  * head->rc_curpage and head->rc_pageoff so that the next RDMA Read
868  * result will land contiguously with the copied content.
869  *
870  * Return values:
871  *   %0: Inline content was successfully copied
872  *   %-EINVAL: offset or length was incorrect
873  */
874 static int svc_rdma_copy_inline_range(struct svc_rqst *rqstp,
875                                       struct svc_rdma_recv_ctxt *head,
876                                       unsigned int offset,
877                                       unsigned int remaining)
878 {
879         unsigned char *dst, *src = head->rc_recv_buf;
880         unsigned int page_no, numpages;
881
882         numpages = PAGE_ALIGN(head->rc_pageoff + remaining) >> PAGE_SHIFT;
883         for (page_no = 0; page_no < numpages; page_no++) {
884                 unsigned int page_len;
885
886                 page_len = min_t(unsigned int, remaining,
887                                  PAGE_SIZE - head->rc_pageoff);
888
889                 if (!head->rc_pageoff)
890                         head->rc_page_count++;
891
892                 dst = page_address(rqstp->rq_pages[head->rc_curpage]);
893                 memcpy(dst + head->rc_curpage, src + offset, page_len);
894
895                 head->rc_readbytes += page_len;
896                 head->rc_pageoff += page_len;
897                 if (head->rc_pageoff == PAGE_SIZE) {
898                         head->rc_curpage++;
899                         head->rc_pageoff = 0;
900                 }
901                 remaining -= page_len;
902                 offset += page_len;
903         }
904
905         return -EINVAL;
906 }
907
908 /**
909  * svc_rdma_read_multiple_chunks - Construct RDMA Reads to pull data item Read chunks
910  * @rqstp: RPC transaction context
911  * @head: context for ongoing I/O
912  *
913  * The chunk data lands in rqstp->rq_arg as a series of contiguous pages,
914  * like an incoming TCP call.
915  *
916  * Return values:
917  *   %0: RDMA Read WQEs were successfully built
918  *   %-EINVAL: client provided too many chunks or segments,
919  *   %-ENOMEM: rdma_rw context pool was exhausted,
920  *   %-ENOTCONN: posting failed (connection is lost),
921  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
922  */
923 static noinline int
924 svc_rdma_read_multiple_chunks(struct svc_rqst *rqstp,
925                               struct svc_rdma_recv_ctxt *head)
926 {
927         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
928         struct svc_rdma_chunk *chunk, *next;
929         unsigned int start, length;
930         int ret;
931
932         start = 0;
933         chunk = pcl_first_chunk(pcl);
934         length = chunk->ch_position;
935         ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
936         if (ret < 0)
937                 return ret;
938
939         pcl_for_each_chunk(chunk, pcl) {
940                 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
941                 if (ret < 0)
942                         return ret;
943
944                 next = pcl_next_chunk(pcl, chunk);
945                 if (!next)
946                         break;
947
948                 start += length;
949                 length = next->ch_position - head->rc_readbytes;
950                 ret = svc_rdma_copy_inline_range(rqstp, head, start, length);
951                 if (ret < 0)
952                         return ret;
953         }
954
955         start += length;
956         length = head->rc_byte_len - start;
957         return svc_rdma_copy_inline_range(rqstp, head, start, length);
958 }
959
960 /**
961  * svc_rdma_read_data_item - Construct RDMA Reads to pull data item Read chunks
962  * @rqstp: RPC transaction context
963  * @head: context for ongoing I/O
964  *
965  * The chunk data lands in the page list of rqstp->rq_arg.pages.
966  *
967  * Currently NFSD does not look at the rqstp->rq_arg.tail[0] kvec.
968  * Therefore, XDR round-up of the Read chunk and trailing
969  * inline content must both be added at the end of the pagelist.
970  *
971  * Return values:
972  *   %0: RDMA Read WQEs were successfully built
973  *   %-EINVAL: client provided too many chunks or segments,
974  *   %-ENOMEM: rdma_rw context pool was exhausted,
975  *   %-ENOTCONN: posting failed (connection is lost),
976  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
977  */
978 static int svc_rdma_read_data_item(struct svc_rqst *rqstp,
979                                    struct svc_rdma_recv_ctxt *head)
980 {
981         return svc_rdma_build_read_chunk(rqstp, head,
982                                          pcl_first_chunk(&head->rc_read_pcl));
983 }
984
985 /**
986  * svc_rdma_read_chunk_range - Build RDMA Read WRs for portion of a chunk
987  * @rqstp: RPC transaction context
988  * @head: context for ongoing I/O
989  * @chunk: parsed Call chunk to pull
990  * @offset: offset of region to pull
991  * @length: length of region to pull
992  *
993  * Return values:
994  *   %0: RDMA Read WQEs were successfully built
995  *   %-EINVAL: there were not enough resources to finish
996  *   %-ENOMEM: rdma_rw context pool was exhausted,
997  *   %-ENOTCONN: posting failed (connection is lost),
998  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
999  */
1000 static int svc_rdma_read_chunk_range(struct svc_rqst *rqstp,
1001                                      struct svc_rdma_recv_ctxt *head,
1002                                      const struct svc_rdma_chunk *chunk,
1003                                      unsigned int offset, unsigned int length)
1004 {
1005         const struct svc_rdma_segment *segment;
1006         int ret;
1007
1008         ret = -EINVAL;
1009         pcl_for_each_segment(segment, chunk) {
1010                 struct svc_rdma_segment dummy;
1011
1012                 if (offset > segment->rs_length) {
1013                         offset -= segment->rs_length;
1014                         continue;
1015                 }
1016
1017                 dummy.rs_handle = segment->rs_handle;
1018                 dummy.rs_length = min_t(u32, length, segment->rs_length) - offset;
1019                 dummy.rs_offset = segment->rs_offset + offset;
1020
1021                 ret = svc_rdma_build_read_segment(rqstp, head, &dummy);
1022                 if (ret < 0)
1023                         break;
1024
1025                 head->rc_readbytes += dummy.rs_length;
1026                 length -= dummy.rs_length;
1027                 offset = 0;
1028         }
1029         return ret;
1030 }
1031
1032 /**
1033  * svc_rdma_read_call_chunk - Build RDMA Read WQEs to pull a Long Message
1034  * @rqstp: RPC transaction context
1035  * @head: context for ongoing I/O
1036  *
1037  * Return values:
1038  *   %0: RDMA Read WQEs were successfully built
1039  *   %-EINVAL: there were not enough resources to finish
1040  *   %-ENOMEM: rdma_rw context pool was exhausted,
1041  *   %-ENOTCONN: posting failed (connection is lost),
1042  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1043  */
1044 static int svc_rdma_read_call_chunk(struct svc_rqst *rqstp,
1045                                     struct svc_rdma_recv_ctxt *head)
1046 {
1047         const struct svc_rdma_chunk *call_chunk =
1048                         pcl_first_chunk(&head->rc_call_pcl);
1049         const struct svc_rdma_pcl *pcl = &head->rc_read_pcl;
1050         struct svc_rdma_chunk *chunk, *next;
1051         unsigned int start, length;
1052         int ret;
1053
1054         if (pcl_is_empty(pcl))
1055                 return svc_rdma_build_read_chunk(rqstp, head, call_chunk);
1056
1057         start = 0;
1058         chunk = pcl_first_chunk(pcl);
1059         length = chunk->ch_position;
1060         ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1061                                         start, length);
1062         if (ret < 0)
1063                 return ret;
1064
1065         pcl_for_each_chunk(chunk, pcl) {
1066                 ret = svc_rdma_build_read_chunk(rqstp, head, chunk);
1067                 if (ret < 0)
1068                         return ret;
1069
1070                 next = pcl_next_chunk(pcl, chunk);
1071                 if (!next)
1072                         break;
1073
1074                 start += length;
1075                 length = next->ch_position - head->rc_readbytes;
1076                 ret = svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1077                                                 start, length);
1078                 if (ret < 0)
1079                         return ret;
1080         }
1081
1082         start += length;
1083         length = call_chunk->ch_length - start;
1084         return svc_rdma_read_chunk_range(rqstp, head, call_chunk,
1085                                          start, length);
1086 }
1087
1088 /**
1089  * svc_rdma_read_special - Build RDMA Read WQEs to pull a Long Message
1090  * @rqstp: RPC transaction context
1091  * @head: context for ongoing I/O
1092  *
1093  * The start of the data lands in the first page just after the
1094  * Transport header, and the rest lands in rqstp->rq_arg.pages.
1095  *
1096  * Assumptions:
1097  *      - A PZRC is never sent in an RDMA_MSG message, though it's
1098  *        allowed by spec.
1099  *
1100  * Return values:
1101  *   %0: RDMA Read WQEs were successfully built
1102  *   %-EINVAL: client provided too many chunks or segments,
1103  *   %-ENOMEM: rdma_rw context pool was exhausted,
1104  *   %-ENOTCONN: posting failed (connection is lost),
1105  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1106  */
1107 static noinline int svc_rdma_read_special(struct svc_rqst *rqstp,
1108                                           struct svc_rdma_recv_ctxt *head)
1109 {
1110         return svc_rdma_read_call_chunk(rqstp, head);
1111 }
1112
1113 /* Pages under I/O have been copied to head->rc_pages. Ensure that
1114  * svc_xprt_release() does not put them when svc_rdma_recvfrom()
1115  * returns. This has to be done after all Read WRs are constructed
1116  * to properly handle a page that happens to be part of I/O on behalf
1117  * of two different RDMA segments.
1118  *
1119  * Note: if the subsequent post_send fails, these pages have already
1120  * been moved to head->rc_pages and thus will be cleaned up by
1121  * svc_rdma_recv_ctxt_put().
1122  */
1123 static void svc_rdma_clear_rqst_pages(struct svc_rqst *rqstp,
1124                                       struct svc_rdma_recv_ctxt *head)
1125 {
1126         unsigned int i;
1127
1128         for (i = 0; i < head->rc_page_count; i++) {
1129                 head->rc_pages[i] = rqstp->rq_pages[i];
1130                 rqstp->rq_pages[i] = NULL;
1131         }
1132 }
1133
1134 /**
1135  * svc_rdma_process_read_list - Pull list of Read chunks from the client
1136  * @rdma: controlling RDMA transport
1137  * @rqstp: set of pages to use as Read sink buffers
1138  * @head: pages under I/O collect here
1139  *
1140  * The RPC/RDMA protocol assumes that the upper layer's XDR decoders
1141  * pull each Read chunk as they decode an incoming RPC message.
1142  *
1143  * On Linux, however, the server needs to have a fully-constructed RPC
1144  * message in rqstp->rq_arg when there is a positive return code from
1145  * ->xpo_recvfrom. So the Read list is safety-checked immediately when
1146  * it is received, then here the whole Read list is pulled all at once.
1147  * The ingress RPC message is fully reconstructed once all associated
1148  * RDMA Reads have completed.
1149  *
1150  * Return values:
1151  *   %1: all needed RDMA Reads were posted successfully,
1152  *   %-EINVAL: client provided too many chunks or segments,
1153  *   %-ENOMEM: rdma_rw context pool was exhausted,
1154  *   %-ENOTCONN: posting failed (connection is lost),
1155  *   %-EIO: rdma_rw initialization failed (DMA mapping, etc).
1156  */
1157 int svc_rdma_process_read_list(struct svcxprt_rdma *rdma,
1158                                struct svc_rqst *rqstp,
1159                                struct svc_rdma_recv_ctxt *head)
1160 {
1161         struct svc_rdma_chunk_ctxt *cc = &head->rc_cc;
1162         int ret;
1163
1164         cc->cc_cqe.done = svc_rdma_wc_read_done;
1165         cc->cc_sqecount = 0;
1166         head->rc_pageoff = 0;
1167         head->rc_curpage = 0;
1168         head->rc_readbytes = 0;
1169
1170         if (pcl_is_empty(&head->rc_call_pcl)) {
1171                 if (head->rc_read_pcl.cl_count == 1)
1172                         ret = svc_rdma_read_data_item(rqstp, head);
1173                 else
1174                         ret = svc_rdma_read_multiple_chunks(rqstp, head);
1175         } else
1176                 ret = svc_rdma_read_special(rqstp, head);
1177         svc_rdma_clear_rqst_pages(rqstp, head);
1178         if (ret < 0)
1179                 return ret;
1180
1181         trace_svcrdma_post_read_chunk(&cc->cc_cid, cc->cc_sqecount);
1182         ret = svc_rdma_post_chunk_ctxt(rdma, cc);
1183         return ret < 0 ? ret : 1;
1184 }