Use HAVE_FSYNC, we bothered to test for it.
[samba.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30 #ifdef HAVE_LINUX_FALLOC_H
31 #include <linux/falloc.h>
32 #endif
33
34 struct aio_extra;
35 static struct pthreadpool *pool;
36 static int aio_pthread_jobid;
37
38 struct aio_private_data {
39         struct aio_private_data *prev, *next;
40         int jobid;
41         SMB_STRUCT_AIOCB *aiocb;
42         ssize_t ret_size;
43         int ret_errno;
44         bool cancelled;
45         bool write_command;
46         bool flush_write;
47 };
48
49 /* List of outstanding requests we have. */
50 static struct aio_private_data *pd_list;
51
52 static void aio_pthread_handle_completion(struct event_context *event_ctx,
53                                 struct fd_event *event,
54                                 uint16 flags,
55                                 void *p);
56
57
58 /************************************************************************
59  Ensure thread pool is initialized.
60 ***********************************************************************/
61
62 static bool init_aio_threadpool(struct event_context *ev_ctx,
63                                 struct pthreadpool **pp_pool,
64                                 void (*completion_fn)(struct event_context *,
65                                                 struct fd_event *,
66                                                 uint16,
67                                                 void *))
68 {
69         struct fd_event *sock_event = NULL;
70         int ret = 0;
71
72         if (*pp_pool) {
73                 return true;
74         }
75
76         ret = pthreadpool_init(aio_pending_size, pp_pool);
77         if (ret) {
78                 errno = ret;
79                 return false;
80         }
81         sock_event = tevent_add_fd(ev_ctx,
82                                 NULL,
83                                 pthreadpool_signal_fd(*pp_pool),
84                                 TEVENT_FD_READ,
85                                 completion_fn,
86                                 NULL);
87         if (sock_event == NULL) {
88                 pthreadpool_destroy(*pp_pool);
89                 *pp_pool = NULL;
90                 return false;
91         }
92
93         DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
94                   aio_pending_size));
95
96         return true;
97 }
98
99
100 /************************************************************************
101  Worker function - core of the pthread aio engine.
102  This is the function that actually does the IO.
103 ***********************************************************************/
104
105 static void aio_worker(void *private_data)
106 {
107         struct aio_private_data *pd =
108                         (struct aio_private_data *)private_data;
109
110         if (pd->write_command) {
111                 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
112                                 (const void *)pd->aiocb->aio_buf,
113                                 pd->aiocb->aio_nbytes,
114                                 pd->aiocb->aio_offset);
115                 if (pd->ret_size == -1 && errno == ESPIPE) {
116                         /* Maintain the fiction that pipes can
117                            be seeked (sought?) on. */
118                         pd->ret_size = sys_write(pd->aiocb->aio_fildes,
119                                         (const void *)pd->aiocb->aio_buf,
120                                         pd->aiocb->aio_nbytes);
121                 }
122 #if defined(HAVE_FSYNC)
123                 if (pd->ret_size != -1 && pd->flush_write) {
124                         /*
125                          * Optimization - flush if requested.
126                          * Ignore error as upper layer will
127                          * also do this.
128                          */
129                         (void)fsync(pd->aiocb->aio_fildes);
130                 }
131 #endif
132         } else {
133                 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
134                                 (void *)pd->aiocb->aio_buf,
135                                 pd->aiocb->aio_nbytes,
136                                 pd->aiocb->aio_offset);
137                 if (pd->ret_size == -1 && errno == ESPIPE) {
138                         /* Maintain the fiction that pipes can
139                            be seeked (sought?) on. */
140                         pd->ret_size = sys_read(pd->aiocb->aio_fildes,
141                                         (void *)pd->aiocb->aio_buf,
142                                         pd->aiocb->aio_nbytes);
143                 }
144         }
145         if (pd->ret_size == -1) {
146                 pd->ret_errno = errno;
147         } else {
148                 pd->ret_errno = 0;
149         }
150 }
151
152 /************************************************************************
153  Private data destructor.
154 ***********************************************************************/
155
156 static int pd_destructor(struct aio_private_data *pd)
157 {
158         DLIST_REMOVE(pd_list, pd);
159         return 0;
160 }
161
162 /************************************************************************
163  Create and initialize a private data struct.
164 ***********************************************************************/
165
166 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
167                                         SMB_STRUCT_AIOCB *aiocb)
168 {
169         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
170         if (!pd) {
171                 return NULL;
172         }
173         pd->jobid = aio_pthread_jobid++;
174         pd->aiocb = aiocb;
175         pd->ret_size = -1;
176         pd->ret_errno = EINPROGRESS;
177         talloc_set_destructor(pd, pd_destructor);
178         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
179         return pd;
180 }
181
182 /************************************************************************
183  Spin off a threadpool (if needed) and initiate a pread call.
184 ***********************************************************************/
185
186 static int aio_pthread_read(struct vfs_handle_struct *handle,
187                                 struct files_struct *fsp,
188                                 SMB_STRUCT_AIOCB *aiocb)
189 {
190         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
191         struct aio_private_data *pd = NULL;
192         int ret;
193
194         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
195                                 &pool,
196                                 aio_pthread_handle_completion)) {
197                 return -1;
198         }
199
200         pd = create_private_data(aio_ex, aiocb);
201         if (pd == NULL) {
202                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
203                 return -1;
204         }
205
206         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
207         if (ret) {
208                 errno = ret;
209                 return -1;
210         }
211
212         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
213                 "of %llu bytes at offset %llu\n",
214                 pd->jobid,
215                 (unsigned long long)pd->aiocb->aio_nbytes,
216                 (unsigned long long)pd->aiocb->aio_offset));
217
218         return 0;
219 }
220
221 /************************************************************************
222  Spin off a threadpool (if needed) and initiate a pwrite call.
223 ***********************************************************************/
224
225 static int aio_pthread_write(struct vfs_handle_struct *handle,
226                                 struct files_struct *fsp,
227                                 SMB_STRUCT_AIOCB *aiocb)
228 {
229         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
230         struct aio_private_data *pd = NULL;
231         int ret;
232
233         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
234                                 &pool,
235                                 aio_pthread_handle_completion)) {
236                 return -1;
237         }
238
239         pd = create_private_data(aio_ex, aiocb);
240         if (pd == NULL) {
241                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
242                 return -1;
243         }
244
245         pd->write_command = true;
246         if (lp_strict_sync(SNUM(fsp->conn)) &&
247                         (lp_syncalways(SNUM(fsp->conn)) ||
248                                 aio_write_through_requested(aio_ex))) {
249                 pd->flush_write = true;
250         }
251
252
253         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
254         if (ret) {
255                 errno = ret;
256                 return -1;
257         }
258
259         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
260                 "of %llu bytes at offset %llu\n",
261                 pd->jobid,
262                 (unsigned long long)pd->aiocb->aio_nbytes,
263                 (unsigned long long)pd->aiocb->aio_offset));
264
265         return 0;
266 }
267
268 /************************************************************************
269  Find the private data by jobid.
270 ***********************************************************************/
271
272 static struct aio_private_data *find_private_data_by_jobid(int jobid)
273 {
274         struct aio_private_data *pd;
275
276         for (pd = pd_list; pd != NULL; pd = pd->next) {
277                 if (pd->jobid == jobid) {
278                         return pd;
279                 }
280         }
281
282         return NULL;
283 }
284
285 /************************************************************************
286  Callback when an IO completes.
287 ***********************************************************************/
288
289 static void aio_pthread_handle_completion(struct event_context *event_ctx,
290                                 struct fd_event *event,
291                                 uint16 flags,
292                                 void *p)
293 {
294         struct aio_extra *aio_ex = NULL;
295         struct aio_private_data *pd = NULL;
296         int jobid = 0;
297         int ret;
298
299         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
300                         (int)flags));
301
302         if ((flags & EVENT_FD_READ) == 0) {
303                 return;
304         }
305
306         ret = pthreadpool_finished_job(pool, &jobid);
307         if (ret) {
308                 smb_panic("aio_pthread_handle_completion");
309                 return;
310         }
311
312         pd = find_private_data_by_jobid(jobid);
313         if (pd == NULL) {
314                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
315                           jobid));
316                 return;
317         }
318
319         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
320         smbd_aio_complete_aio_ex(aio_ex);
321
322         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
323                 jobid ));
324         TALLOC_FREE(aio_ex);
325 }
326
327 /************************************************************************
328  Find the private data by aiocb.
329 ***********************************************************************/
330
331 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
332 {
333         struct aio_private_data *pd;
334
335         for (pd = pd_list; pd != NULL; pd = pd->next) {
336                 if (pd->aiocb == aiocb) {
337                         return pd;
338                 }
339         }
340
341         return NULL;
342 }
343
344 /************************************************************************
345  Called to return the result of a completed AIO.
346  Should only be called if aio_error returns something other than EINPROGRESS.
347  Returns:
348         Any other value - return from IO operation.
349 ***********************************************************************/
350
351 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
352                                 struct files_struct *fsp,
353                                 SMB_STRUCT_AIOCB *aiocb)
354 {
355         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
356
357         if (pd == NULL) {
358                 errno = EINVAL;
359                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
360                 return -1;
361         }
362
363         pd->aiocb = NULL;
364
365         if (pd->cancelled) {
366                 errno = ECANCELED;
367                 return -1;
368         }
369
370         if (pd->ret_size == -1) {
371                 errno = pd->ret_errno;
372         }
373
374         return pd->ret_size;
375 }
376
377 /************************************************************************
378  Called to check the result of an AIO.
379  Returns:
380         EINPROGRESS - still in progress.
381         EINVAL - invalid aiocb.
382         ECANCELED - request was cancelled.
383         0 - request completed successfully.
384         Any other value - errno from IO operation.
385 ***********************************************************************/
386
387 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
388                              struct files_struct *fsp,
389                              SMB_STRUCT_AIOCB *aiocb)
390 {
391         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
392
393         if (pd == NULL) {
394                 return EINVAL;
395         }
396         if (pd->cancelled) {
397                 return ECANCELED;
398         }
399         return pd->ret_errno;
400 }
401
402 /************************************************************************
403  Called to request the cancel of an AIO, or all of them on a specific
404  fsp if aiocb == NULL.
405 ***********************************************************************/
406
407 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
408                         struct files_struct *fsp,
409                         SMB_STRUCT_AIOCB *aiocb)
410 {
411         struct aio_private_data *pd = NULL;
412
413         for (pd = pd_list; pd != NULL; pd = pd->next) {
414                 if (pd->aiocb == NULL) {
415                         continue;
416                 }
417                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
418                         continue;
419                 }
420                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
421                         continue;
422                 }
423
424                 /*
425                  * We let the child do its job, but we discard the result when
426                  * it's finished.
427                  */
428
429                 pd->cancelled = true;
430         }
431
432         return AIO_CANCELED;
433 }
434
435 /************************************************************************
436  Callback for a previously detected job completion.
437 ***********************************************************************/
438
439 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
440                                 struct tevent_immediate *im,
441                                 void *private_data)
442 {
443         struct aio_extra *aio_ex = NULL;
444         struct aio_private_data *pd = (struct aio_private_data *)private_data;
445
446         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
447         smbd_aio_complete_aio_ex(aio_ex);
448         TALLOC_FREE(aio_ex);
449 }
450
451 /************************************************************************
452  Private data struct used in suspend completion code.
453 ***********************************************************************/
454
455 struct suspend_private {
456         int num_entries;
457         int num_finished;
458         const SMB_STRUCT_AIOCB * const *aiocb_array;
459 };
460
461 /************************************************************************
462  Callback when an IO completes from a suspend call.
463 ***********************************************************************/
464
465 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
466                                 struct fd_event *event,
467                                 uint16 flags,
468                                 void *p)
469 {
470         struct suspend_private *sp = (struct suspend_private *)p;
471         struct aio_private_data *pd = NULL;
472         struct tevent_immediate *im = NULL;
473         int jobid;
474         int i;
475
476         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
477                         (int)flags));
478
479         if ((flags & EVENT_FD_READ) == 0) {
480                 return;
481         }
482
483         if (pthreadpool_finished_job(pool, &jobid)) {
484                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
485                 return;
486         }
487
488         pd = find_private_data_by_jobid(jobid);
489         if (pd == NULL) {
490                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
491                           jobid));
492                 return;
493         }
494
495         /* Is this a jobid with an aiocb we're interested in ? */
496         for (i = 0; i < sp->num_entries; i++) {
497                 if (sp->aiocb_array[i] == pd->aiocb) {
498                         sp->num_finished++;
499                         return;
500                 }
501         }
502
503         /* Jobid completed we weren't waiting for.
504            We must reschedule this as an immediate event
505            on the main event context. */
506         im = tevent_create_immediate(NULL);
507         if (!im) {
508                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
509         }
510
511         DEBUG(10,("aio_pthread_handle_suspend_completion: "
512                         "re-scheduling job id %d\n",
513                         jobid));
514
515         tevent_schedule_immediate(im,
516                         server_event_context(),
517                         aio_pthread_handle_immediate,
518                         (void *)pd);
519 }
520
521
522 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
523                                         struct tevent_timer *te,
524                                         struct timeval now,
525                                         void *private_data)
526 {
527         bool *timed_out = (bool *)private_data;
528         /* Remove this timed event handler. */
529         TALLOC_FREE(te);
530         *timed_out = true;
531 }
532
533 /************************************************************************
534  Called to request everything to stop until all IO is completed.
535 ***********************************************************************/
536
537 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
538                         struct files_struct *fsp,
539                         const SMB_STRUCT_AIOCB * const aiocb_array[],
540                         int n,
541                         const struct timespec *timeout)
542 {
543         struct event_context *ev = NULL;
544         struct fd_event *sock_event = NULL;
545         int ret = -1;
546         struct suspend_private sp;
547         bool timed_out = false;
548         TALLOC_CTX *frame = talloc_stackframe();
549
550         /* This is a blocking call, and has to use a sub-event loop. */
551         ev = event_context_init(frame);
552         if (ev == NULL) {
553                 errno = ENOMEM;
554                 goto out;
555         }
556
557         if (timeout) {
558                 struct timeval tv = convert_timespec_to_timeval(*timeout);
559                 struct tevent_timer *te = tevent_add_timer(ev,
560                                                 frame,
561                                                 timeval_current_ofs(tv.tv_sec,
562                                                                     tv.tv_usec),
563                                                 aio_pthread_suspend_timed_out,
564                                                 &timed_out);
565                 if (!te) {
566                         errno = ENOMEM;
567                         goto out;
568                 }
569         }
570
571         ZERO_STRUCT(sp);
572         sp.num_entries = n;
573         sp.aiocb_array = aiocb_array;
574         sp.num_finished = 0;
575
576         sock_event = tevent_add_fd(ev,
577                                 frame,
578                                 pthreadpool_signal_fd(pool),
579                                 TEVENT_FD_READ,
580                                 aio_pthread_handle_suspend_completion,
581                                 (void *)&sp);
582         if (sock_event == NULL) {
583                 pthreadpool_destroy(pool);
584                 pool = NULL;
585                 goto out;
586         }
587         /*
588          * We're going to cheat here. We know that smbd/aio.c
589          * only calls this when it's waiting for every single
590          * outstanding call to finish on a close, so just wait
591          * individually for each IO to complete. We don't care
592          * what order they finish - only that they all do. JRA.
593          */
594         while (sp.num_entries != sp.num_finished) {
595                 if (tevent_loop_once(ev) == -1) {
596                         goto out;
597                 }
598
599                 if (timed_out) {
600                         errno = EAGAIN;
601                         goto out;
602                 }
603         }
604
605         ret = 0;
606
607   out:
608
609         TALLOC_FREE(frame);
610         return ret;
611 }
612
613 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
614 /*
615  * We must have openat() to do any thread-based
616  * asynchronous opens. We also must be using
617  * thread-specific credentials (Linux-only
618  * for now).
619  */
620
621 /*
622  * NB. This threadpool is shared over all
623  * instances of this VFS module in this
624  * process, as is the current jobid.
625  */
626
627 static struct pthreadpool *open_pool;
628 static int aio_pthread_open_jobid;
629
630 struct aio_open_private_data {
631         struct aio_open_private_data *prev, *next;
632         /* Inputs. */
633         int jobid;
634         int dir_fd;
635         int flags;
636         mode_t mode;
637         uint64_t mid;
638         bool in_progress;
639         const char *fname;
640         char *dname;
641         struct smbd_server_connection *sconn;
642         const struct security_unix_token *ux_tok;
643         uint64_t initial_allocation_size;
644         /* Returns. */
645         int ret_fd;
646         int ret_errno;
647 };
648
649 /* List of outstanding requests we have. */
650 static struct aio_open_private_data *open_pd_list;
651
652 /************************************************************************
653  Find the open private data by jobid.
654 ***********************************************************************/
655
656 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
657 {
658         struct aio_open_private_data *opd;
659
660         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
661                 if (opd->jobid == jobid) {
662                         return opd;
663                 }
664         }
665
666         return NULL;
667 }
668
669 /************************************************************************
670  Find the open private data by mid.
671 ***********************************************************************/
672
673 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
674 {
675         struct aio_open_private_data *opd;
676
677         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
678                 if (opd->mid == mid) {
679                         return opd;
680                 }
681         }
682
683         return NULL;
684 }
685
686 /************************************************************************
687  Callback when an open completes.
688 ***********************************************************************/
689
690 static void aio_open_handle_completion(struct event_context *event_ctx,
691                                 struct fd_event *event,
692                                 uint16 flags,
693                                 void *p)
694 {
695         struct aio_open_private_data *opd = NULL;
696         int jobid = 0;
697         int ret;
698
699         DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
700                 (int)flags));
701
702         if ((flags & EVENT_FD_READ) == 0) {
703                 return;
704         }
705
706         ret = pthreadpool_finished_job(open_pool, &jobid);
707         if (ret) {
708                 smb_panic("aio_open_handle_completion");
709                 /* notreached. */
710                 return;
711         }
712
713         opd = find_open_private_data_by_jobid(jobid);
714         if (opd == NULL) {
715                 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
716                         jobid));
717                 smb_panic("aio_open_handle_completion - no jobid");
718                 /* notreached. */
719                 return;
720         }
721
722         DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
723                 "for file %s/%s completed\n",
724                 jobid,
725                 (unsigned long long)opd->mid,
726                 opd->dname,
727                 opd->fname));
728
729         opd->in_progress = false;
730
731         /* Find outstanding event and reschdule. */
732         if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
733                 /*
734                  * Outstanding event didn't exist or was
735                  * cancelled. Free up the fd and throw
736                  * away the result.
737                  */
738                 if (opd->ret_fd != -1) {
739                         close(opd->ret_fd);
740                         opd->ret_fd = -1;
741                 }
742                 TALLOC_FREE(opd);
743         }
744 }
745
746 /*****************************************************************
747  The core of the async open code - the worker function. Note we
748  use the new openat() system call to avoid any problems with
749  current working directory changes plus we change credentials
750  on the thread to prevent any security race conditions.
751 *****************************************************************/
752
753 static void aio_open_worker(void *private_data)
754 {
755         struct aio_open_private_data *opd =
756                 (struct aio_open_private_data *)private_data;
757
758         /* Become the correct credential on this thread. */
759         if (set_thread_credentials(opd->ux_tok->uid,
760                                 opd->ux_tok->gid,
761                                 (size_t)opd->ux_tok->ngroups,
762                                 opd->ux_tok->groups) != 0) {
763                 opd->ret_fd = -1;
764                 opd->ret_errno = errno;
765                 return;
766         }
767
768         opd->ret_fd = openat(opd->dir_fd,
769                         opd->fname,
770                         opd->flags,
771                         opd->mode);
772
773         if (opd->ret_fd == -1) {
774                 opd->ret_errno = errno;
775         } else {
776                 /* Create was successful. */
777                 opd->ret_errno = 0;
778
779 #if defined(HAVE_LINUX_FALLOCATE)
780                 /*
781                  * See if we can set the initial
782                  * allocation size. We don't record
783                  * the return for this as it's an
784                  * optimization - the upper layer
785                  * will also do this for us once
786                  * the open returns.
787                  */
788                 if (opd->initial_allocation_size) {
789                         (void)fallocate(opd->ret_fd,
790                                         FALLOC_FL_KEEP_SIZE,
791                                         0,
792                                         (off_t)opd->initial_allocation_size);
793                 }
794 #endif
795         }
796 }
797
798 /************************************************************************
799  Open private data destructor.
800 ***********************************************************************/
801
802 static int opd_destructor(struct aio_open_private_data *opd)
803 {
804         if (opd->dir_fd != -1) {
805                 close(opd->dir_fd);
806         }
807         DLIST_REMOVE(open_pd_list, opd);
808         return 0;
809 }
810
811 /************************************************************************
812  Create and initialize a private data struct for async open.
813 ***********************************************************************/
814
815 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
816                                         int flags,
817                                         mode_t mode)
818 {
819         struct aio_open_private_data *opd = talloc_zero(NULL,
820                                         struct aio_open_private_data);
821         const char *fname = NULL;
822
823         if (!opd) {
824                 return NULL;
825         }
826
827         opd->jobid = aio_pthread_open_jobid++;
828         opd->dir_fd = -1;
829         opd->ret_fd = -1;
830         opd->ret_errno = EINPROGRESS;
831         opd->flags = flags;
832         opd->mode = mode;
833         opd->mid = fsp->mid;
834         opd->in_progress = true;
835         opd->sconn = fsp->conn->sconn;
836         opd->initial_allocation_size = fsp->initial_allocation_size;
837
838         /* Copy our current credentials. */
839         opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
840         if (opd->ux_tok == NULL) {
841                 TALLOC_FREE(opd);
842                 return NULL;
843         }
844
845         /*
846          * Copy the parent directory name and the
847          * relative path within it.
848          */
849         if (parent_dirname(opd,
850                         fsp->fsp_name->base_name,
851                         &opd->dname,
852                         &fname) == false) {
853                 TALLOC_FREE(opd);
854                 return NULL;
855         }
856         opd->fname = talloc_strdup(opd, fname);
857         if (opd->fname == NULL) {
858                 TALLOC_FREE(opd);
859                 return NULL;
860         }
861
862 #if defined(O_DIRECTORY)
863         opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
864 #else
865         opd->dir_fd = open(opd->dname, O_RDONLY);
866 #endif
867         if (opd->dir_fd == -1) {
868                 TALLOC_FREE(opd);
869                 return NULL;
870         }
871
872         talloc_set_destructor(opd, opd_destructor);
873         DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
874         return opd;
875 }
876
877 /*****************************************************************
878  Setup an async open.
879 *****************************************************************/
880
881 static int open_async(const files_struct *fsp,
882                         int flags,
883                         mode_t mode)
884 {
885         struct aio_open_private_data *opd = NULL;
886         int ret;
887
888         if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
889                         &open_pool,
890                         aio_open_handle_completion)) {
891                 return -1;
892         }
893
894         opd = create_private_open_data(fsp, flags, mode);
895         if (opd == NULL) {
896                 DEBUG(10, ("open_async: Could not create private data.\n"));
897                 return -1;
898         }
899
900         ret = pthreadpool_add_job(open_pool,
901                                 opd->jobid,
902                                 aio_open_worker,
903                                 (void *)opd);
904         if (ret) {
905                 errno = ret;
906                 return -1;
907         }
908
909         DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
910                 (unsigned long long)opd->mid,
911                 opd->jobid,
912                 opd->dname,
913                 opd->fname));
914
915         /* Cause the calling code to reschedule us. */
916         errno = EINTR; /* Maps to NT_STATUS_RETRY. */
917         return -1;
918 }
919
920 /*****************************************************************
921  Look for a matching SMB2 mid. If we find it we're rescheduled,
922  just return the completed open.
923 *****************************************************************/
924
925 static bool find_completed_open(files_struct *fsp,
926                                 int *p_fd,
927                                 int *p_errno)
928 {
929         struct aio_open_private_data *opd;
930
931         opd = find_open_private_data_by_mid(fsp->mid);
932         if (!opd) {
933                 return false;
934         }
935
936         if (opd->in_progress) {
937                 DEBUG(0,("find_completed_open: mid %llu "
938                         "jobid %d still in progress for "
939                         "file %s/%s. PANIC !\n",
940                         (unsigned long long)opd->mid,
941                         opd->jobid,
942                         opd->dname,
943                         opd->fname));
944                 /* Disaster ! This is an open timeout. Just panic. */
945                 smb_panic("find_completed_open - in_progress\n");
946                 /* notreached. */
947                 return false;
948         }
949
950         *p_fd = opd->ret_fd;
951         *p_errno = opd->ret_errno;
952
953         DEBUG(5,("find_completed_open: mid %llu returning "
954                 "fd = %d, errno = %d (%s) "
955                 "jobid (%d) for file %s\n",
956                 (unsigned long long)opd->mid,
957                 opd->ret_fd,
958                 opd->ret_errno,
959                 strerror(opd->ret_errno),
960                 opd->jobid,
961                 smb_fname_str_dbg(fsp->fsp_name)));
962
963         /* Now we can free the opd. */
964         TALLOC_FREE(opd);
965         return true;
966 }
967
968 /*****************************************************************
969  The core open function. Only go async on O_CREAT|O_EXCL
970  opens to prevent any race conditions.
971 *****************************************************************/
972
973 static int aio_pthread_open_fn(vfs_handle_struct *handle,
974                         struct smb_filename *smb_fname,
975                         files_struct *fsp,
976                         int flags,
977                         mode_t mode)
978 {
979         int my_errno = 0;
980         int fd = -1;
981         bool aio_allow_open = lp_parm_bool(
982                 SNUM(handle->conn), "aio_pthread", "aio open", false);
983
984         if (smb_fname->stream_name) {
985                 /* Don't handle stream opens. */
986                 errno = ENOENT;
987                 return -1;
988         }
989
990         if (!aio_allow_open) {
991                 /* aio opens turned off. */
992                 return open(smb_fname->base_name, flags, mode);
993         }
994
995         if (!(flags & O_CREAT)) {
996                 /* Only creates matter. */
997                 return open(smb_fname->base_name, flags, mode);
998         }
999
1000         if (!(flags & O_EXCL)) {
1001                 /* Only creates with O_EXCL matter. */
1002                 return open(smb_fname->base_name, flags, mode);
1003         }
1004
1005         /*
1006          * See if this is a reentrant call - i.e. is this a
1007          * restart of an existing open that just completed.
1008          */
1009
1010         if (find_completed_open(fsp,
1011                                 &fd,
1012                                 &my_errno)) {
1013                 errno = my_errno;
1014                 return fd;
1015         }
1016
1017         /* Ok, it's a create exclusive call - pass it to a thread helper. */
1018         return open_async(fsp, flags, mode);
1019 }
1020 #endif
1021
1022 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
1023                                const char *user)
1024 {
1025         /*********************************************************************
1026          * How many threads to initialize ?
1027          * 100 per process seems insane as a default until you realize that
1028          * (a) Threads terminate after 1 second when idle.
1029          * (b) Throttling is done in SMB2 via the crediting algorithm.
1030          * (c) SMB1 clients are limited to max_mux (50) outstanding
1031          *     requests and Windows clients don't use this anyway.
1032          * Essentially we want this to be unlimited unless smb.conf
1033          * says different.
1034          *********************************************************************/
1035         aio_pending_size = lp_parm_int(
1036                 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
1037         return SMB_VFS_NEXT_CONNECT(handle, service, user);
1038 }
1039
1040 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1041         .connect_fn = aio_pthread_connect,
1042 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1043         .open_fn = aio_pthread_open_fn,
1044 #endif
1045         .aio_read_fn = aio_pthread_read,
1046         .aio_write_fn = aio_pthread_write,
1047         .aio_return_fn = aio_pthread_return_fn,
1048         .aio_cancel_fn = aio_pthread_cancel,
1049         .aio_error_fn = aio_pthread_error_fn,
1050         .aio_suspend_fn = aio_pthread_suspend,
1051 };
1052
1053 NTSTATUS vfs_aio_pthread_init(void);
1054 NTSTATUS vfs_aio_pthread_init(void)
1055 {
1056         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1057                                 "aio_pthread", &vfs_aio_pthread_fns);
1058 }