Add an optimization to pthread aio writes to also do fsync if requested.
[mat/samba.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30
31 struct aio_extra;
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
34
35 struct aio_private_data {
36         struct aio_private_data *prev, *next;
37         int jobid;
38         SMB_STRUCT_AIOCB *aiocb;
39         ssize_t ret_size;
40         int ret_errno;
41         bool cancelled;
42         bool write_command;
43         bool flush_write;
44 };
45
46 /* List of outstanding requests we have. */
47 static struct aio_private_data *pd_list;
48
49 static void aio_pthread_handle_completion(struct event_context *event_ctx,
50                                 struct fd_event *event,
51                                 uint16 flags,
52                                 void *p);
53
54
55 /************************************************************************
56  Ensure thread pool is initialized.
57 ***********************************************************************/
58
59 static bool init_aio_threadpool(struct event_context *ev_ctx,
60                                 struct pthreadpool **pp_pool,
61                                 void (*completion_fn)(struct event_context *,
62                                                 struct fd_event *,
63                                                 uint16,
64                                                 void *))
65 {
66         struct fd_event *sock_event = NULL;
67         int ret = 0;
68
69         if (*pp_pool) {
70                 return true;
71         }
72
73         ret = pthreadpool_init(aio_pending_size, pp_pool);
74         if (ret) {
75                 errno = ret;
76                 return false;
77         }
78         sock_event = tevent_add_fd(ev_ctx,
79                                 NULL,
80                                 pthreadpool_signal_fd(*pp_pool),
81                                 TEVENT_FD_READ,
82                                 completion_fn,
83                                 NULL);
84         if (sock_event == NULL) {
85                 pthreadpool_destroy(*pp_pool);
86                 *pp_pool = NULL;
87                 return false;
88         }
89
90         DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
91                   aio_pending_size));
92
93         return true;
94 }
95
96
97 /************************************************************************
98  Worker function - core of the pthread aio engine.
99  This is the function that actually does the IO.
100 ***********************************************************************/
101
102 static void aio_worker(void *private_data)
103 {
104         struct aio_private_data *pd =
105                         (struct aio_private_data *)private_data;
106
107         if (pd->write_command) {
108                 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
109                                 (const void *)pd->aiocb->aio_buf,
110                                 pd->aiocb->aio_nbytes,
111                                 pd->aiocb->aio_offset);
112                 if (pd->ret_size == -1 && errno == ESPIPE) {
113                         /* Maintain the fiction that pipes can
114                            be seeked (sought?) on. */
115                         pd->ret_size = sys_write(pd->aiocb->aio_fildes,
116                                         (const void *)pd->aiocb->aio_buf,
117                                         pd->aiocb->aio_nbytes);
118                 }
119                 if (pd->ret_size != -1 && pd->flush_write) {
120                         /*
121                          * Optimization - flush if requested.
122                          * Ignore error as upper layer will
123                          * also do this.
124                          */
125                         (void)fsync(pd->aiocb->aio_fildes);
126                 }
127         } else {
128                 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
129                                 (void *)pd->aiocb->aio_buf,
130                                 pd->aiocb->aio_nbytes,
131                                 pd->aiocb->aio_offset);
132                 if (pd->ret_size == -1 && errno == ESPIPE) {
133                         /* Maintain the fiction that pipes can
134                            be seeked (sought?) on. */
135                         pd->ret_size = sys_read(pd->aiocb->aio_fildes,
136                                         (void *)pd->aiocb->aio_buf,
137                                         pd->aiocb->aio_nbytes);
138                 }
139         }
140         if (pd->ret_size == -1) {
141                 pd->ret_errno = errno;
142         } else {
143                 pd->ret_errno = 0;
144         }
145 }
146
147 /************************************************************************
148  Private data destructor.
149 ***********************************************************************/
150
151 static int pd_destructor(struct aio_private_data *pd)
152 {
153         DLIST_REMOVE(pd_list, pd);
154         return 0;
155 }
156
157 /************************************************************************
158  Create and initialize a private data struct.
159 ***********************************************************************/
160
161 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
162                                         SMB_STRUCT_AIOCB *aiocb)
163 {
164         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
165         if (!pd) {
166                 return NULL;
167         }
168         pd->jobid = aio_pthread_jobid++;
169         pd->aiocb = aiocb;
170         pd->ret_size = -1;
171         pd->ret_errno = EINPROGRESS;
172         talloc_set_destructor(pd, pd_destructor);
173         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
174         return pd;
175 }
176
177 /************************************************************************
178  Spin off a threadpool (if needed) and initiate a pread call.
179 ***********************************************************************/
180
181 static int aio_pthread_read(struct vfs_handle_struct *handle,
182                                 struct files_struct *fsp,
183                                 SMB_STRUCT_AIOCB *aiocb)
184 {
185         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
186         struct aio_private_data *pd = NULL;
187         int ret;
188
189         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
190                                 &pool,
191                                 aio_pthread_handle_completion)) {
192                 return -1;
193         }
194
195         pd = create_private_data(aio_ex, aiocb);
196         if (pd == NULL) {
197                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
198                 return -1;
199         }
200
201         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
202         if (ret) {
203                 errno = ret;
204                 return -1;
205         }
206
207         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
208                 "of %llu bytes at offset %llu\n",
209                 pd->jobid,
210                 (unsigned long long)pd->aiocb->aio_nbytes,
211                 (unsigned long long)pd->aiocb->aio_offset));
212
213         return 0;
214 }
215
216 /************************************************************************
217  Spin off a threadpool (if needed) and initiate a pwrite call.
218 ***********************************************************************/
219
220 static int aio_pthread_write(struct vfs_handle_struct *handle,
221                                 struct files_struct *fsp,
222                                 SMB_STRUCT_AIOCB *aiocb)
223 {
224         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
225         struct aio_private_data *pd = NULL;
226         int ret;
227
228         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
229                                 &pool,
230                                 aio_pthread_handle_completion)) {
231                 return -1;
232         }
233
234         pd = create_private_data(aio_ex, aiocb);
235         if (pd == NULL) {
236                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
237                 return -1;
238         }
239
240         pd->write_command = true;
241         if (lp_strict_sync(SNUM(fsp->conn)) &&
242                         (lp_syncalways(SNUM(fsp->conn)) ||
243                                 aio_write_through_requested(aio_ex))) {
244                 pd->flush_write = true;
245         }
246
247
248         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
249         if (ret) {
250                 errno = ret;
251                 return -1;
252         }
253
254         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
255                 "of %llu bytes at offset %llu\n",
256                 pd->jobid,
257                 (unsigned long long)pd->aiocb->aio_nbytes,
258                 (unsigned long long)pd->aiocb->aio_offset));
259
260         return 0;
261 }
262
263 /************************************************************************
264  Find the private data by jobid.
265 ***********************************************************************/
266
267 static struct aio_private_data *find_private_data_by_jobid(int jobid)
268 {
269         struct aio_private_data *pd;
270
271         for (pd = pd_list; pd != NULL; pd = pd->next) {
272                 if (pd->jobid == jobid) {
273                         return pd;
274                 }
275         }
276
277         return NULL;
278 }
279
280 /************************************************************************
281  Callback when an IO completes.
282 ***********************************************************************/
283
284 static void aio_pthread_handle_completion(struct event_context *event_ctx,
285                                 struct fd_event *event,
286                                 uint16 flags,
287                                 void *p)
288 {
289         struct aio_extra *aio_ex = NULL;
290         struct aio_private_data *pd = NULL;
291         int jobid = 0;
292         int ret;
293
294         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
295                         (int)flags));
296
297         if ((flags & EVENT_FD_READ) == 0) {
298                 return;
299         }
300
301         ret = pthreadpool_finished_job(pool, &jobid);
302         if (ret) {
303                 smb_panic("aio_pthread_handle_completion");
304                 return;
305         }
306
307         pd = find_private_data_by_jobid(jobid);
308         if (pd == NULL) {
309                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
310                           jobid));
311                 return;
312         }
313
314         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
315         smbd_aio_complete_aio_ex(aio_ex);
316
317         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
318                 jobid ));
319         TALLOC_FREE(aio_ex);
320 }
321
322 /************************************************************************
323  Find the private data by aiocb.
324 ***********************************************************************/
325
326 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
327 {
328         struct aio_private_data *pd;
329
330         for (pd = pd_list; pd != NULL; pd = pd->next) {
331                 if (pd->aiocb == aiocb) {
332                         return pd;
333                 }
334         }
335
336         return NULL;
337 }
338
339 /************************************************************************
340  Called to return the result of a completed AIO.
341  Should only be called if aio_error returns something other than EINPROGRESS.
342  Returns:
343         Any other value - return from IO operation.
344 ***********************************************************************/
345
346 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
347                                 struct files_struct *fsp,
348                                 SMB_STRUCT_AIOCB *aiocb)
349 {
350         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
351
352         if (pd == NULL) {
353                 errno = EINVAL;
354                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
355                 return -1;
356         }
357
358         pd->aiocb = NULL;
359
360         if (pd->cancelled) {
361                 errno = ECANCELED;
362                 return -1;
363         }
364
365         if (pd->ret_size == -1) {
366                 errno = pd->ret_errno;
367         }
368
369         return pd->ret_size;
370 }
371
372 /************************************************************************
373  Called to check the result of an AIO.
374  Returns:
375         EINPROGRESS - still in progress.
376         EINVAL - invalid aiocb.
377         ECANCELED - request was cancelled.
378         0 - request completed successfully.
379         Any other value - errno from IO operation.
380 ***********************************************************************/
381
382 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
383                              struct files_struct *fsp,
384                              SMB_STRUCT_AIOCB *aiocb)
385 {
386         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
387
388         if (pd == NULL) {
389                 return EINVAL;
390         }
391         if (pd->cancelled) {
392                 return ECANCELED;
393         }
394         return pd->ret_errno;
395 }
396
397 /************************************************************************
398  Called to request the cancel of an AIO, or all of them on a specific
399  fsp if aiocb == NULL.
400 ***********************************************************************/
401
402 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
403                         struct files_struct *fsp,
404                         SMB_STRUCT_AIOCB *aiocb)
405 {
406         struct aio_private_data *pd = NULL;
407
408         for (pd = pd_list; pd != NULL; pd = pd->next) {
409                 if (pd->aiocb == NULL) {
410                         continue;
411                 }
412                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
413                         continue;
414                 }
415                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
416                         continue;
417                 }
418
419                 /*
420                  * We let the child do its job, but we discard the result when
421                  * it's finished.
422                  */
423
424                 pd->cancelled = true;
425         }
426
427         return AIO_CANCELED;
428 }
429
430 /************************************************************************
431  Callback for a previously detected job completion.
432 ***********************************************************************/
433
434 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
435                                 struct tevent_immediate *im,
436                                 void *private_data)
437 {
438         struct aio_extra *aio_ex = NULL;
439         struct aio_private_data *pd = (struct aio_private_data *)private_data;
440
441         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
442         smbd_aio_complete_aio_ex(aio_ex);
443         TALLOC_FREE(aio_ex);
444 }
445
446 /************************************************************************
447  Private data struct used in suspend completion code.
448 ***********************************************************************/
449
450 struct suspend_private {
451         int num_entries;
452         int num_finished;
453         const SMB_STRUCT_AIOCB * const *aiocb_array;
454 };
455
456 /************************************************************************
457  Callback when an IO completes from a suspend call.
458 ***********************************************************************/
459
460 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
461                                 struct fd_event *event,
462                                 uint16 flags,
463                                 void *p)
464 {
465         struct suspend_private *sp = (struct suspend_private *)p;
466         struct aio_private_data *pd = NULL;
467         struct tevent_immediate *im = NULL;
468         int jobid;
469         int i;
470
471         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
472                         (int)flags));
473
474         if ((flags & EVENT_FD_READ) == 0) {
475                 return;
476         }
477
478         if (pthreadpool_finished_job(pool, &jobid)) {
479                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
480                 return;
481         }
482
483         pd = find_private_data_by_jobid(jobid);
484         if (pd == NULL) {
485                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
486                           jobid));
487                 return;
488         }
489
490         /* Is this a jobid with an aiocb we're interested in ? */
491         for (i = 0; i < sp->num_entries; i++) {
492                 if (sp->aiocb_array[i] == pd->aiocb) {
493                         sp->num_finished++;
494                         return;
495                 }
496         }
497
498         /* Jobid completed we weren't waiting for.
499            We must reschedule this as an immediate event
500            on the main event context. */
501         im = tevent_create_immediate(NULL);
502         if (!im) {
503                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
504         }
505
506         DEBUG(10,("aio_pthread_handle_suspend_completion: "
507                         "re-scheduling job id %d\n",
508                         jobid));
509
510         tevent_schedule_immediate(im,
511                         server_event_context(),
512                         aio_pthread_handle_immediate,
513                         (void *)pd);
514 }
515
516
517 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
518                                         struct tevent_timer *te,
519                                         struct timeval now,
520                                         void *private_data)
521 {
522         bool *timed_out = (bool *)private_data;
523         /* Remove this timed event handler. */
524         TALLOC_FREE(te);
525         *timed_out = true;
526 }
527
528 /************************************************************************
529  Called to request everything to stop until all IO is completed.
530 ***********************************************************************/
531
532 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
533                         struct files_struct *fsp,
534                         const SMB_STRUCT_AIOCB * const aiocb_array[],
535                         int n,
536                         const struct timespec *timeout)
537 {
538         struct event_context *ev = NULL;
539         struct fd_event *sock_event = NULL;
540         int ret = -1;
541         struct suspend_private sp;
542         bool timed_out = false;
543         TALLOC_CTX *frame = talloc_stackframe();
544
545         /* This is a blocking call, and has to use a sub-event loop. */
546         ev = event_context_init(frame);
547         if (ev == NULL) {
548                 errno = ENOMEM;
549                 goto out;
550         }
551
552         if (timeout) {
553                 struct timeval tv = convert_timespec_to_timeval(*timeout);
554                 struct tevent_timer *te = tevent_add_timer(ev,
555                                                 frame,
556                                                 timeval_current_ofs(tv.tv_sec,
557                                                                     tv.tv_usec),
558                                                 aio_pthread_suspend_timed_out,
559                                                 &timed_out);
560                 if (!te) {
561                         errno = ENOMEM;
562                         goto out;
563                 }
564         }
565
566         ZERO_STRUCT(sp);
567         sp.num_entries = n;
568         sp.aiocb_array = aiocb_array;
569         sp.num_finished = 0;
570
571         sock_event = tevent_add_fd(ev,
572                                 frame,
573                                 pthreadpool_signal_fd(pool),
574                                 TEVENT_FD_READ,
575                                 aio_pthread_handle_suspend_completion,
576                                 (void *)&sp);
577         if (sock_event == NULL) {
578                 pthreadpool_destroy(pool);
579                 pool = NULL;
580                 goto out;
581         }
582         /*
583          * We're going to cheat here. We know that smbd/aio.c
584          * only calls this when it's waiting for every single
585          * outstanding call to finish on a close, so just wait
586          * individually for each IO to complete. We don't care
587          * what order they finish - only that they all do. JRA.
588          */
589         while (sp.num_entries != sp.num_finished) {
590                 if (tevent_loop_once(ev) == -1) {
591                         goto out;
592                 }
593
594                 if (timed_out) {
595                         errno = EAGAIN;
596                         goto out;
597                 }
598         }
599
600         ret = 0;
601
602   out:
603
604         TALLOC_FREE(frame);
605         return ret;
606 }
607
608 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
609 /*
610  * We must have openat() to do any thread-based
611  * asynchronous opens. We also must be using
612  * thread-specific credentials (Linux-only
613  * for now).
614  */
615
616 /*
617  * NB. This threadpool is shared over all
618  * instances of this VFS module in this
619  * process, as is the current jobid.
620  */
621
622 static struct pthreadpool *open_pool;
623 static int aio_pthread_open_jobid;
624
625 struct aio_open_private_data {
626         struct aio_open_private_data *prev, *next;
627         /* Inputs. */
628         int jobid;
629         int dir_fd;
630         int flags;
631         mode_t mode;
632         uint64_t mid;
633         bool in_progress;
634         const char *fname;
635         char *dname;
636         struct smbd_server_connection *sconn;
637         const struct security_unix_token *ux_tok;
638         /* Returns. */
639         int ret_fd;
640         int ret_errno;
641 };
642
643 /* List of outstanding requests we have. */
644 static struct aio_open_private_data *open_pd_list;
645
646 /************************************************************************
647  Find the open private data by jobid.
648 ***********************************************************************/
649
650 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
651 {
652         struct aio_open_private_data *opd;
653
654         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
655                 if (opd->jobid == jobid) {
656                         return opd;
657                 }
658         }
659
660         return NULL;
661 }
662
663 /************************************************************************
664  Find the open private data by mid.
665 ***********************************************************************/
666
667 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
668 {
669         struct aio_open_private_data *opd;
670
671         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
672                 if (opd->mid == mid) {
673                         return opd;
674                 }
675         }
676
677         return NULL;
678 }
679
680 /************************************************************************
681  Callback when an open completes.
682 ***********************************************************************/
683
684 static void aio_open_handle_completion(struct event_context *event_ctx,
685                                 struct fd_event *event,
686                                 uint16 flags,
687                                 void *p)
688 {
689         struct aio_open_private_data *opd = NULL;
690         int jobid = 0;
691         int ret;
692
693         DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
694                 (int)flags));
695
696         if ((flags & EVENT_FD_READ) == 0) {
697                 return;
698         }
699
700         ret = pthreadpool_finished_job(open_pool, &jobid);
701         if (ret) {
702                 smb_panic("aio_open_handle_completion");
703                 /* notreached. */
704                 return;
705         }
706
707         opd = find_open_private_data_by_jobid(jobid);
708         if (opd == NULL) {
709                 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
710                         jobid));
711                 smb_panic("aio_open_handle_completion - no jobid");
712                 /* notreached. */
713                 return;
714         }
715
716         DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
717                 "for file %s/%s completed\n",
718                 jobid,
719                 (unsigned long long)opd->mid,
720                 opd->dname,
721                 opd->fname));
722
723         opd->in_progress = false;
724
725         /* Find outstanding event and reschdule. */
726         if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
727                 /*
728                  * Outstanding event didn't exist or was
729                  * cancelled. Free up the fd and throw
730                  * away the result.
731                  */
732                 if (opd->ret_fd != -1) {
733                         close(opd->ret_fd);
734                         opd->ret_fd = -1;
735                 }
736                 TALLOC_FREE(opd);
737         }
738 }
739
740 /*****************************************************************
741  The core of the async open code - the worker function. Note we
742  use the new openat() system call to avoid any problems with
743  current working directory changes plus we change credentials
744  on the thread to prevent any security race conditions.
745 *****************************************************************/
746
747 static void aio_open_worker(void *private_data)
748 {
749         struct aio_open_private_data *opd =
750                 (struct aio_open_private_data *)private_data;
751
752         /* Become the correct credential on this thread. */
753         if (set_thread_credentials(opd->ux_tok->uid,
754                                 opd->ux_tok->gid,
755                                 (size_t)opd->ux_tok->ngroups,
756                                 opd->ux_tok->groups) != 0) {
757                 opd->ret_fd = -1;
758                 opd->ret_errno = errno;
759                 return;
760         }
761
762         opd->ret_fd = openat(opd->dir_fd,
763                         opd->fname,
764                         opd->flags,
765                         opd->mode);
766
767         if (opd->ret_fd == -1) {
768                 opd->ret_errno = errno;
769         } else {
770                 /* Create was successful. */
771                 opd->ret_errno = 0;
772         }
773 }
774
775 /************************************************************************
776  Open private data destructor.
777 ***********************************************************************/
778
779 static int opd_destructor(struct aio_open_private_data *opd)
780 {
781         if (opd->dir_fd != -1) {
782                 close(opd->dir_fd);
783         }
784         DLIST_REMOVE(open_pd_list, opd);
785         return 0;
786 }
787
788 /************************************************************************
789  Create and initialize a private data struct for async open.
790 ***********************************************************************/
791
792 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
793                                         int flags,
794                                         mode_t mode)
795 {
796         struct aio_open_private_data *opd = talloc_zero(NULL,
797                                         struct aio_open_private_data);
798         const char *fname = NULL;
799
800         if (!opd) {
801                 return NULL;
802         }
803
804         opd->jobid = aio_pthread_open_jobid++;
805         opd->dir_fd = -1;
806         opd->ret_fd = -1;
807         opd->ret_errno = EINPROGRESS;
808         opd->flags = flags;
809         opd->mode = mode;
810         opd->mid = fsp->mid;
811         opd->in_progress = true;
812         opd->sconn = fsp->conn->sconn;
813
814         /* Copy our current credentials. */
815         opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
816         if (opd->ux_tok == NULL) {
817                 TALLOC_FREE(opd);
818                 return NULL;
819         }
820
821         /*
822          * Copy the parent directory name and the
823          * relative path within it.
824          */
825         if (parent_dirname(opd,
826                         fsp->fsp_name->base_name,
827                         &opd->dname,
828                         &fname) == false) {
829                 TALLOC_FREE(opd);
830                 return NULL;
831         }
832         opd->fname = talloc_strdup(opd, fname);
833         if (opd->fname == NULL) {
834                 TALLOC_FREE(opd);
835                 return NULL;
836         }
837
838 #if defined(O_DIRECTORY)
839         opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
840 #else
841         opd->dir_fd = open(opd->dname, O_RDONLY);
842 #endif
843         if (opd->dir_fd == -1) {
844                 TALLOC_FREE(opd);
845                 return NULL;
846         }
847
848         talloc_set_destructor(opd, opd_destructor);
849         DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
850         return opd;
851 }
852
853 /*****************************************************************
854  Setup an async open.
855 *****************************************************************/
856
857 static int open_async(const files_struct *fsp,
858                         int flags,
859                         mode_t mode)
860 {
861         struct aio_open_private_data *opd = NULL;
862         int ret;
863
864         if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
865                         &open_pool,
866                         aio_open_handle_completion)) {
867                 return -1;
868         }
869
870         opd = create_private_open_data(fsp, flags, mode);
871         if (opd == NULL) {
872                 DEBUG(10, ("open_async: Could not create private data.\n"));
873                 return -1;
874         }
875
876         ret = pthreadpool_add_job(open_pool,
877                                 opd->jobid,
878                                 aio_open_worker,
879                                 (void *)opd);
880         if (ret) {
881                 errno = ret;
882                 return -1;
883         }
884
885         DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
886                 (unsigned long long)opd->mid,
887                 opd->jobid,
888                 opd->dname,
889                 opd->fname));
890
891         /* Cause the calling code to reschedule us. */
892         errno = EINTR; /* Maps to NT_STATUS_RETRY. */
893         return -1;
894 }
895
896 /*****************************************************************
897  Look for a matching SMB2 mid. If we find it we're rescheduled,
898  just return the completed open.
899 *****************************************************************/
900
901 static bool find_completed_open(files_struct *fsp,
902                                 int *p_fd,
903                                 int *p_errno)
904 {
905         struct aio_open_private_data *opd;
906
907         opd = find_open_private_data_by_mid(fsp->mid);
908         if (!opd) {
909                 return false;
910         }
911
912         if (opd->in_progress) {
913                 DEBUG(0,("find_completed_open: mid %llu "
914                         "jobid %d still in progress for "
915                         "file %s/%s. PANIC !\n",
916                         (unsigned long long)opd->mid,
917                         opd->jobid,
918                         opd->dname,
919                         opd->fname));
920                 /* Disaster ! This is an open timeout. Just panic. */
921                 smb_panic("find_completed_open - in_progress\n");
922                 /* notreached. */
923                 return false;
924         }
925
926         *p_fd = opd->ret_fd;
927         *p_errno = opd->ret_errno;
928
929         DEBUG(5,("find_completed_open: mid %llu returning "
930                 "fd = %d, errno = %d (%s) "
931                 "jobid (%d) for file %s\n",
932                 (unsigned long long)opd->mid,
933                 opd->ret_fd,
934                 opd->ret_errno,
935                 strerror(opd->ret_errno),
936                 opd->jobid,
937                 smb_fname_str_dbg(fsp->fsp_name)));
938
939         /* Now we can free the opd. */
940         TALLOC_FREE(opd);
941         return true;
942 }
943
944 /*****************************************************************
945  The core open function. Only go async on O_CREAT|O_EXCL
946  opens to prevent any race conditions.
947 *****************************************************************/
948
949 static int aio_pthread_open_fn(vfs_handle_struct *handle,
950                         struct smb_filename *smb_fname,
951                         files_struct *fsp,
952                         int flags,
953                         mode_t mode)
954 {
955         int my_errno = 0;
956         int fd = -1;
957         bool aio_allow_open = lp_parm_bool(
958                 SNUM(handle->conn), "aio_pthread", "aio open", false);
959
960         if (smb_fname->stream_name) {
961                 /* Don't handle stream opens. */
962                 errno = ENOENT;
963                 return -1;
964         }
965
966         if (!aio_allow_open) {
967                 /* aio opens turned off. */
968                 return open(smb_fname->base_name, flags, mode);
969         }
970
971         if (!(flags & O_CREAT)) {
972                 /* Only creates matter. */
973                 return open(smb_fname->base_name, flags, mode);
974         }
975
976         if (!(flags & O_EXCL)) {
977                 /* Only creates with O_EXCL matter. */
978                 return open(smb_fname->base_name, flags, mode);
979         }
980
981         /*
982          * See if this is a reentrant call - i.e. is this a
983          * restart of an existing open that just completed.
984          */
985
986         if (find_completed_open(fsp,
987                                 &fd,
988                                 &my_errno)) {
989                 errno = my_errno;
990                 return fd;
991         }
992
993         /* Ok, it's a create exclusive call - pass it to a thread helper. */
994         return open_async(fsp, flags, mode);
995 }
996 #endif
997
998 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
999                                const char *user)
1000 {
1001         /*********************************************************************
1002          * How many threads to initialize ?
1003          * 100 per process seems insane as a default until you realize that
1004          * (a) Threads terminate after 1 second when idle.
1005          * (b) Throttling is done in SMB2 via the crediting algorithm.
1006          * (c) SMB1 clients are limited to max_mux (50) outstanding
1007          *     requests and Windows clients don't use this anyway.
1008          * Essentially we want this to be unlimited unless smb.conf
1009          * says different.
1010          *********************************************************************/
1011         aio_pending_size = lp_parm_int(
1012                 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
1013         return SMB_VFS_NEXT_CONNECT(handle, service, user);
1014 }
1015
1016 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1017         .connect_fn = aio_pthread_connect,
1018 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1019         .open_fn = aio_pthread_open_fn,
1020 #endif
1021         .aio_read_fn = aio_pthread_read,
1022         .aio_write_fn = aio_pthread_write,
1023         .aio_return_fn = aio_pthread_return_fn,
1024         .aio_cancel_fn = aio_pthread_cancel,
1025         .aio_error_fn = aio_pthread_error_fn,
1026         .aio_suspend_fn = aio_pthread_suspend,
1027 };
1028
1029 NTSTATUS vfs_aio_pthread_init(void);
1030 NTSTATUS vfs_aio_pthread_init(void)
1031 {
1032         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1033                                 "aio_pthread", &vfs_aio_pthread_fns);
1034 }