Add in the threaded async open engine.
[samba.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30
31 struct aio_extra;
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
34
35 struct aio_private_data {
36         struct aio_private_data *prev, *next;
37         int jobid;
38         SMB_STRUCT_AIOCB *aiocb;
39         ssize_t ret_size;
40         int ret_errno;
41         bool cancelled;
42         bool write_command;
43 };
44
45 /* List of outstanding requests we have. */
46 static struct aio_private_data *pd_list;
47
48 static void aio_pthread_handle_completion(struct event_context *event_ctx,
49                                 struct fd_event *event,
50                                 uint16 flags,
51                                 void *p);
52
53
54 /************************************************************************
55  Ensure thread pool is initialized.
56 ***********************************************************************/
57
58 static bool init_aio_threadpool(struct event_context *ev_ctx,
59                                 struct pthreadpool **pp_pool,
60                                 void (*completion_fn)(struct event_context *,
61                                                 struct fd_event *,
62                                                 uint16,
63                                                 void *))
64 {
65         struct fd_event *sock_event = NULL;
66         int ret = 0;
67
68         if (*pp_pool) {
69                 return true;
70         }
71
72         ret = pthreadpool_init(aio_pending_size, pp_pool);
73         if (ret) {
74                 errno = ret;
75                 return false;
76         }
77         sock_event = tevent_add_fd(ev_ctx,
78                                 NULL,
79                                 pthreadpool_signal_fd(*pp_pool),
80                                 TEVENT_FD_READ,
81                                 completion_fn,
82                                 NULL);
83         if (sock_event == NULL) {
84                 pthreadpool_destroy(*pp_pool);
85                 *pp_pool = NULL;
86                 return false;
87         }
88
89         DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
90                   aio_pending_size));
91
92         return true;
93 }
94
95
96 /************************************************************************
97  Worker function - core of the pthread aio engine.
98  This is the function that actually does the IO.
99 ***********************************************************************/
100
101 static void aio_worker(void *private_data)
102 {
103         struct aio_private_data *pd =
104                         (struct aio_private_data *)private_data;
105
106         if (pd->write_command) {
107                 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
108                                 (const void *)pd->aiocb->aio_buf,
109                                 pd->aiocb->aio_nbytes,
110                                 pd->aiocb->aio_offset);
111                 if (pd->ret_size == -1 && errno == ESPIPE) {
112                         /* Maintain the fiction that pipes can
113                            be seeked (sought?) on. */
114                         pd->ret_size = sys_write(pd->aiocb->aio_fildes,
115                                         (const void *)pd->aiocb->aio_buf,
116                                         pd->aiocb->aio_nbytes);
117                 }
118         } else {
119                 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
120                                 (void *)pd->aiocb->aio_buf,
121                                 pd->aiocb->aio_nbytes,
122                                 pd->aiocb->aio_offset);
123                 if (pd->ret_size == -1 && errno == ESPIPE) {
124                         /* Maintain the fiction that pipes can
125                            be seeked (sought?) on. */
126                         pd->ret_size = sys_read(pd->aiocb->aio_fildes,
127                                         (void *)pd->aiocb->aio_buf,
128                                         pd->aiocb->aio_nbytes);
129                 }
130         }
131         if (pd->ret_size == -1) {
132                 pd->ret_errno = errno;
133         } else {
134                 pd->ret_errno = 0;
135         }
136 }
137
138 /************************************************************************
139  Private data destructor.
140 ***********************************************************************/
141
142 static int pd_destructor(struct aio_private_data *pd)
143 {
144         DLIST_REMOVE(pd_list, pd);
145         return 0;
146 }
147
148 /************************************************************************
149  Create and initialize a private data struct.
150 ***********************************************************************/
151
152 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
153                                         SMB_STRUCT_AIOCB *aiocb)
154 {
155         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
156         if (!pd) {
157                 return NULL;
158         }
159         pd->jobid = aio_pthread_jobid++;
160         pd->aiocb = aiocb;
161         pd->ret_size = -1;
162         pd->ret_errno = EINPROGRESS;
163         talloc_set_destructor(pd, pd_destructor);
164         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
165         return pd;
166 }
167
168 /************************************************************************
169  Spin off a threadpool (if needed) and initiate a pread call.
170 ***********************************************************************/
171
172 static int aio_pthread_read(struct vfs_handle_struct *handle,
173                                 struct files_struct *fsp,
174                                 SMB_STRUCT_AIOCB *aiocb)
175 {
176         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
177         struct aio_private_data *pd = NULL;
178         int ret;
179
180         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
181                                 &pool,
182                                 aio_pthread_handle_completion)) {
183                 return -1;
184         }
185
186         pd = create_private_data(aio_ex, aiocb);
187         if (pd == NULL) {
188                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
189                 return -1;
190         }
191
192         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
193         if (ret) {
194                 errno = ret;
195                 return -1;
196         }
197
198         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
199                 "of %llu bytes at offset %llu\n",
200                 pd->jobid,
201                 (unsigned long long)pd->aiocb->aio_nbytes,
202                 (unsigned long long)pd->aiocb->aio_offset));
203
204         return 0;
205 }
206
207 /************************************************************************
208  Spin off a threadpool (if needed) and initiate a pwrite call.
209 ***********************************************************************/
210
211 static int aio_pthread_write(struct vfs_handle_struct *handle,
212                                 struct files_struct *fsp,
213                                 SMB_STRUCT_AIOCB *aiocb)
214 {
215         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
216         struct aio_private_data *pd = NULL;
217         int ret;
218
219         if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
220                                 &pool,
221                                 aio_pthread_handle_completion)) {
222                 return -1;
223         }
224
225         pd = create_private_data(aio_ex, aiocb);
226         if (pd == NULL) {
227                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
228                 return -1;
229         }
230
231         pd->write_command = true;
232
233         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
234         if (ret) {
235                 errno = ret;
236                 return -1;
237         }
238
239         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
240                 "of %llu bytes at offset %llu\n",
241                 pd->jobid,
242                 (unsigned long long)pd->aiocb->aio_nbytes,
243                 (unsigned long long)pd->aiocb->aio_offset));
244
245         return 0;
246 }
247
248 /************************************************************************
249  Find the private data by jobid.
250 ***********************************************************************/
251
252 static struct aio_private_data *find_private_data_by_jobid(int jobid)
253 {
254         struct aio_private_data *pd;
255
256         for (pd = pd_list; pd != NULL; pd = pd->next) {
257                 if (pd->jobid == jobid) {
258                         return pd;
259                 }
260         }
261
262         return NULL;
263 }
264
265 /************************************************************************
266  Callback when an IO completes.
267 ***********************************************************************/
268
269 static void aio_pthread_handle_completion(struct event_context *event_ctx,
270                                 struct fd_event *event,
271                                 uint16 flags,
272                                 void *p)
273 {
274         struct aio_extra *aio_ex = NULL;
275         struct aio_private_data *pd = NULL;
276         int jobid = 0;
277         int ret;
278
279         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
280                         (int)flags));
281
282         if ((flags & EVENT_FD_READ) == 0) {
283                 return;
284         }
285
286         ret = pthreadpool_finished_job(pool, &jobid);
287         if (ret) {
288                 smb_panic("aio_pthread_handle_completion");
289                 return;
290         }
291
292         pd = find_private_data_by_jobid(jobid);
293         if (pd == NULL) {
294                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
295                           jobid));
296                 return;
297         }
298
299         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
300         smbd_aio_complete_aio_ex(aio_ex);
301
302         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
303                 jobid ));
304         TALLOC_FREE(aio_ex);
305 }
306
307 /************************************************************************
308  Find the private data by aiocb.
309 ***********************************************************************/
310
311 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
312 {
313         struct aio_private_data *pd;
314
315         for (pd = pd_list; pd != NULL; pd = pd->next) {
316                 if (pd->aiocb == aiocb) {
317                         return pd;
318                 }
319         }
320
321         return NULL;
322 }
323
324 /************************************************************************
325  Called to return the result of a completed AIO.
326  Should only be called if aio_error returns something other than EINPROGRESS.
327  Returns:
328         Any other value - return from IO operation.
329 ***********************************************************************/
330
331 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
332                                 struct files_struct *fsp,
333                                 SMB_STRUCT_AIOCB *aiocb)
334 {
335         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
336
337         if (pd == NULL) {
338                 errno = EINVAL;
339                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
340                 return -1;
341         }
342
343         pd->aiocb = NULL;
344
345         if (pd->cancelled) {
346                 errno = ECANCELED;
347                 return -1;
348         }
349
350         if (pd->ret_size == -1) {
351                 errno = pd->ret_errno;
352         }
353
354         return pd->ret_size;
355 }
356
357 /************************************************************************
358  Called to check the result of an AIO.
359  Returns:
360         EINPROGRESS - still in progress.
361         EINVAL - invalid aiocb.
362         ECANCELED - request was cancelled.
363         0 - request completed successfully.
364         Any other value - errno from IO operation.
365 ***********************************************************************/
366
367 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
368                              struct files_struct *fsp,
369                              SMB_STRUCT_AIOCB *aiocb)
370 {
371         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
372
373         if (pd == NULL) {
374                 return EINVAL;
375         }
376         if (pd->cancelled) {
377                 return ECANCELED;
378         }
379         return pd->ret_errno;
380 }
381
382 /************************************************************************
383  Called to request the cancel of an AIO, or all of them on a specific
384  fsp if aiocb == NULL.
385 ***********************************************************************/
386
387 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
388                         struct files_struct *fsp,
389                         SMB_STRUCT_AIOCB *aiocb)
390 {
391         struct aio_private_data *pd = NULL;
392
393         for (pd = pd_list; pd != NULL; pd = pd->next) {
394                 if (pd->aiocb == NULL) {
395                         continue;
396                 }
397                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
398                         continue;
399                 }
400                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
401                         continue;
402                 }
403
404                 /*
405                  * We let the child do its job, but we discard the result when
406                  * it's finished.
407                  */
408
409                 pd->cancelled = true;
410         }
411
412         return AIO_CANCELED;
413 }
414
415 /************************************************************************
416  Callback for a previously detected job completion.
417 ***********************************************************************/
418
419 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
420                                 struct tevent_immediate *im,
421                                 void *private_data)
422 {
423         struct aio_extra *aio_ex = NULL;
424         struct aio_private_data *pd = (struct aio_private_data *)private_data;
425
426         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
427         smbd_aio_complete_aio_ex(aio_ex);
428         TALLOC_FREE(aio_ex);
429 }
430
431 /************************************************************************
432  Private data struct used in suspend completion code.
433 ***********************************************************************/
434
435 struct suspend_private {
436         int num_entries;
437         int num_finished;
438         const SMB_STRUCT_AIOCB * const *aiocb_array;
439 };
440
441 /************************************************************************
442  Callback when an IO completes from a suspend call.
443 ***********************************************************************/
444
445 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
446                                 struct fd_event *event,
447                                 uint16 flags,
448                                 void *p)
449 {
450         struct suspend_private *sp = (struct suspend_private *)p;
451         struct aio_private_data *pd = NULL;
452         struct tevent_immediate *im = NULL;
453         int jobid;
454         int i;
455
456         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
457                         (int)flags));
458
459         if ((flags & EVENT_FD_READ) == 0) {
460                 return;
461         }
462
463         if (pthreadpool_finished_job(pool, &jobid)) {
464                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
465                 return;
466         }
467
468         pd = find_private_data_by_jobid(jobid);
469         if (pd == NULL) {
470                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
471                           jobid));
472                 return;
473         }
474
475         /* Is this a jobid with an aiocb we're interested in ? */
476         for (i = 0; i < sp->num_entries; i++) {
477                 if (sp->aiocb_array[i] == pd->aiocb) {
478                         sp->num_finished++;
479                         return;
480                 }
481         }
482
483         /* Jobid completed we weren't waiting for.
484            We must reschedule this as an immediate event
485            on the main event context. */
486         im = tevent_create_immediate(NULL);
487         if (!im) {
488                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
489         }
490
491         DEBUG(10,("aio_pthread_handle_suspend_completion: "
492                         "re-scheduling job id %d\n",
493                         jobid));
494
495         tevent_schedule_immediate(im,
496                         server_event_context(),
497                         aio_pthread_handle_immediate,
498                         (void *)pd);
499 }
500
501
502 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
503                                         struct tevent_timer *te,
504                                         struct timeval now,
505                                         void *private_data)
506 {
507         bool *timed_out = (bool *)private_data;
508         /* Remove this timed event handler. */
509         TALLOC_FREE(te);
510         *timed_out = true;
511 }
512
513 /************************************************************************
514  Called to request everything to stop until all IO is completed.
515 ***********************************************************************/
516
517 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
518                         struct files_struct *fsp,
519                         const SMB_STRUCT_AIOCB * const aiocb_array[],
520                         int n,
521                         const struct timespec *timeout)
522 {
523         struct event_context *ev = NULL;
524         struct fd_event *sock_event = NULL;
525         int ret = -1;
526         struct suspend_private sp;
527         bool timed_out = false;
528         TALLOC_CTX *frame = talloc_stackframe();
529
530         /* This is a blocking call, and has to use a sub-event loop. */
531         ev = event_context_init(frame);
532         if (ev == NULL) {
533                 errno = ENOMEM;
534                 goto out;
535         }
536
537         if (timeout) {
538                 struct timeval tv = convert_timespec_to_timeval(*timeout);
539                 struct tevent_timer *te = tevent_add_timer(ev,
540                                                 frame,
541                                                 timeval_current_ofs(tv.tv_sec,
542                                                                     tv.tv_usec),
543                                                 aio_pthread_suspend_timed_out,
544                                                 &timed_out);
545                 if (!te) {
546                         errno = ENOMEM;
547                         goto out;
548                 }
549         }
550
551         ZERO_STRUCT(sp);
552         sp.num_entries = n;
553         sp.aiocb_array = aiocb_array;
554         sp.num_finished = 0;
555
556         sock_event = tevent_add_fd(ev,
557                                 frame,
558                                 pthreadpool_signal_fd(pool),
559                                 TEVENT_FD_READ,
560                                 aio_pthread_handle_suspend_completion,
561                                 (void *)&sp);
562         if (sock_event == NULL) {
563                 pthreadpool_destroy(pool);
564                 pool = NULL;
565                 goto out;
566         }
567         /*
568          * We're going to cheat here. We know that smbd/aio.c
569          * only calls this when it's waiting for every single
570          * outstanding call to finish on a close, so just wait
571          * individually for each IO to complete. We don't care
572          * what order they finish - only that they all do. JRA.
573          */
574         while (sp.num_entries != sp.num_finished) {
575                 if (tevent_loop_once(ev) == -1) {
576                         goto out;
577                 }
578
579                 if (timed_out) {
580                         errno = EAGAIN;
581                         goto out;
582                 }
583         }
584
585         ret = 0;
586
587   out:
588
589         TALLOC_FREE(frame);
590         return ret;
591 }
592
593 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
594 /*
595  * We must have openat() to do any thread-based
596  * asynchronous opens. We also must be using
597  * thread-specific credentials (Linux-only
598  * for now).
599  */
600
601 /*
602  * NB. This threadpool is shared over all
603  * instances of this VFS module in this
604  * process, as is the current jobid.
605  */
606
607 static struct pthreadpool *open_pool;
608 static int aio_pthread_open_jobid;
609
610 struct aio_open_private_data {
611         struct aio_open_private_data *prev, *next;
612         /* Inputs. */
613         int jobid;
614         int dir_fd;
615         int flags;
616         mode_t mode;
617         uint64_t mid;
618         bool in_progress;
619         const char *fname;
620         char *dname;
621         struct smbd_server_connection *sconn;
622         const struct security_unix_token *ux_tok;
623         /* Returns. */
624         int ret_fd;
625         int ret_errno;
626 };
627
628 /* List of outstanding requests we have. */
629 static struct aio_open_private_data *open_pd_list;
630
631 /************************************************************************
632  Find the open private data by jobid.
633 ***********************************************************************/
634
635 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
636 {
637         struct aio_open_private_data *opd;
638
639         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
640                 if (opd->jobid == jobid) {
641                         return opd;
642                 }
643         }
644
645         return NULL;
646 }
647
648 /************************************************************************
649  Find the open private data by mid.
650 ***********************************************************************/
651
652 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
653 {
654         struct aio_open_private_data *opd;
655
656         for (opd = open_pd_list; opd != NULL; opd = opd->next) {
657                 if (opd->mid == mid) {
658                         return opd;
659                 }
660         }
661
662         return NULL;
663 }
664
665 /************************************************************************
666  Callback when an open completes.
667 ***********************************************************************/
668
669 static void aio_open_handle_completion(struct event_context *event_ctx,
670                                 struct fd_event *event,
671                                 uint16 flags,
672                                 void *p)
673 {
674         struct aio_open_private_data *opd = NULL;
675         int jobid = 0;
676         int ret;
677
678         DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
679                 (int)flags));
680
681         if ((flags & EVENT_FD_READ) == 0) {
682                 return;
683         }
684
685         ret = pthreadpool_finished_job(open_pool, &jobid);
686         if (ret) {
687                 smb_panic("aio_open_handle_completion");
688                 /* notreached. */
689                 return;
690         }
691
692         opd = find_open_private_data_by_jobid(jobid);
693         if (opd == NULL) {
694                 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
695                         jobid));
696                 smb_panic("aio_open_handle_completion - no jobid");
697                 /* notreached. */
698                 return;
699         }
700
701         DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
702                 "for file %s/%s completed\n",
703                 jobid,
704                 (unsigned long long)opd->mid,
705                 opd->dname,
706                 opd->fname));
707
708         opd->in_progress = false;
709
710         /* Find outstanding event and reschdule. */
711         if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
712                 /*
713                  * Outstanding event didn't exist or was
714                  * cancelled. Free up the fd and throw
715                  * away the result.
716                  */
717                 if (opd->ret_fd != -1) {
718                         close(opd->ret_fd);
719                         opd->ret_fd = -1;
720                 }
721                 TALLOC_FREE(opd);
722         }
723 }
724
725 /*****************************************************************
726  The core of the async open code - the worker function. Note we
727  use the new openat() system call to avoid any problems with
728  current working directory changes plus we change credentials
729  on the thread to prevent any security race conditions.
730 *****************************************************************/
731
732 static void aio_open_worker(void *private_data)
733 {
734         struct aio_open_private_data *opd =
735                 (struct aio_open_private_data *)private_data;
736
737         /* Become the correct credential on this thread. */
738         if (set_thread_credentials(opd->ux_tok->uid,
739                                 opd->ux_tok->gid,
740                                 (size_t)opd->ux_tok->ngroups,
741                                 opd->ux_tok->groups) != 0) {
742                 opd->ret_fd = -1;
743                 opd->ret_errno = errno;
744                 return;
745         }
746
747         opd->ret_fd = openat(opd->dir_fd,
748                         opd->fname,
749                         opd->flags,
750                         opd->mode);
751
752         if (opd->ret_fd == -1) {
753                 opd->ret_errno = errno;
754         } else {
755                 /* Create was successful. */
756                 opd->ret_errno = 0;
757         }
758 }
759
760 /************************************************************************
761  Open private data destructor.
762 ***********************************************************************/
763
764 static int opd_destructor(struct aio_open_private_data *opd)
765 {
766         if (opd->dir_fd != -1) {
767                 close(opd->dir_fd);
768         }
769         DLIST_REMOVE(open_pd_list, opd);
770         return 0;
771 }
772
773 /************************************************************************
774  Create and initialize a private data struct for async open.
775 ***********************************************************************/
776
777 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
778                                         int flags,
779                                         mode_t mode)
780 {
781         struct aio_open_private_data *opd = talloc_zero(NULL,
782                                         struct aio_open_private_data);
783         const char *fname = NULL;
784
785         if (!opd) {
786                 return NULL;
787         }
788
789         opd->jobid = aio_pthread_open_jobid++;
790         opd->dir_fd = -1;
791         opd->ret_fd = -1;
792         opd->ret_errno = EINPROGRESS;
793         opd->flags = flags;
794         opd->mode = mode;
795         opd->mid = fsp->mid;
796         opd->in_progress = true;
797         opd->sconn = fsp->conn->sconn;
798
799         /* Copy our current credentials. */
800         opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
801         if (opd->ux_tok == NULL) {
802                 TALLOC_FREE(opd);
803                 return NULL;
804         }
805
806         /*
807          * Copy the parent directory name and the
808          * relative path within it.
809          */
810         if (parent_dirname(opd,
811                         fsp->fsp_name->base_name,
812                         &opd->dname,
813                         &fname) == false) {
814                 TALLOC_FREE(opd);
815                 return NULL;
816         }
817         opd->fname = talloc_strdup(opd, fname);
818         if (opd->fname == NULL) {
819                 TALLOC_FREE(opd);
820                 return NULL;
821         }
822
823 #if defined(O_DIRECTORY)
824         opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
825 #else
826         opd->dir_fd = open(opd->dname, O_RDONLY);
827 #endif
828         if (opd->dir_fd == -1) {
829                 TALLOC_FREE(opd);
830                 return NULL;
831         }
832
833         talloc_set_destructor(opd, opd_destructor);
834         DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
835         return opd;
836 }
837
838 /*****************************************************************
839  Setup an async open.
840 *****************************************************************/
841
842 static int open_async(const files_struct *fsp,
843                         int flags,
844                         mode_t mode)
845 {
846         struct aio_open_private_data *opd = NULL;
847         int ret;
848
849         if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
850                         &open_pool,
851                         aio_open_handle_completion)) {
852                 return -1;
853         }
854
855         opd = create_private_open_data(fsp, flags, mode);
856         if (opd == NULL) {
857                 DEBUG(10, ("open_async: Could not create private data.\n"));
858                 return -1;
859         }
860
861         ret = pthreadpool_add_job(open_pool,
862                                 opd->jobid,
863                                 aio_open_worker,
864                                 (void *)opd);
865         if (ret) {
866                 errno = ret;
867                 return -1;
868         }
869
870         DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
871                 (unsigned long long)opd->mid,
872                 opd->jobid,
873                 opd->dname,
874                 opd->fname));
875
876         /* Cause the calling code to reschedule us. */
877         errno = EINTR; /* Maps to NT_STATUS_RETRY. */
878         return -1;
879 }
880
881 /*****************************************************************
882  Look for a matching SMB2 mid. If we find it we're rescheduled,
883  just return the completed open.
884 *****************************************************************/
885
886 static bool find_completed_open(files_struct *fsp,
887                                 int *p_fd,
888                                 int *p_errno)
889 {
890         struct aio_open_private_data *opd;
891
892         opd = find_open_private_data_by_mid(fsp->mid);
893         if (!opd) {
894                 return false;
895         }
896
897         if (opd->in_progress) {
898                 DEBUG(0,("find_completed_open: mid %llu "
899                         "jobid %d still in progress for "
900                         "file %s/%s. PANIC !\n",
901                         (unsigned long long)opd->mid,
902                         opd->jobid,
903                         opd->dname,
904                         opd->fname));
905                 /* Disaster ! This is an open timeout. Just panic. */
906                 smb_panic("find_completed_open - in_progress\n");
907                 /* notreached. */
908                 return false;
909         }
910
911         *p_fd = opd->ret_fd;
912         *p_errno = opd->ret_errno;
913
914         DEBUG(5,("find_completed_open: mid %llu returning "
915                 "fd = %d, errno = %d (%s) "
916                 "jobid (%d) for file %s\n",
917                 (unsigned long long)opd->mid,
918                 opd->ret_fd,
919                 opd->ret_errno,
920                 strerror(opd->ret_errno),
921                 opd->jobid,
922                 smb_fname_str_dbg(fsp->fsp_name)));
923
924         /* Now we can free the opd. */
925         TALLOC_FREE(opd);
926         return true;
927 }
928
929 /*****************************************************************
930  The core open function. Only go async on O_CREAT|O_EXCL
931  opens to prevent any race conditions.
932 *****************************************************************/
933
934 static int aio_pthread_open_fn(vfs_handle_struct *handle,
935                         struct smb_filename *smb_fname,
936                         files_struct *fsp,
937                         int flags,
938                         mode_t mode)
939 {
940         int my_errno = 0;
941         int fd = -1;
942         bool aio_allow_open = lp_parm_bool(
943                 SNUM(handle->conn), "aio_pthread", "aio open", false);
944
945         if (smb_fname->stream_name) {
946                 /* Don't handle stream opens. */
947                 errno = ENOENT;
948                 return -1;
949         }
950
951         if (!aio_allow_open) {
952                 /* aio opens turned off. */
953                 return open(smb_fname->base_name, flags, mode);
954         }
955
956         if (!(flags & O_CREAT)) {
957                 /* Only creates matter. */
958                 return open(smb_fname->base_name, flags, mode);
959         }
960
961         if (!(flags & O_EXCL)) {
962                 /* Only creates with O_EXCL matter. */
963                 return open(smb_fname->base_name, flags, mode);
964         }
965
966         /*
967          * See if this is a reentrant call - i.e. is this a
968          * restart of an existing open that just completed.
969          */
970
971         if (find_completed_open(fsp,
972                                 &fd,
973                                 &my_errno)) {
974                 errno = my_errno;
975                 return fd;
976         }
977
978         /* Ok, it's a create exclusive call - pass it to a thread helper. */
979         return open_async(fsp, flags, mode);
980 }
981 #endif
982
983 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
984                                const char *user)
985 {
986         /*********************************************************************
987          * How many threads to initialize ?
988          * 100 per process seems insane as a default until you realize that
989          * (a) Threads terminate after 1 second when idle.
990          * (b) Throttling is done in SMB2 via the crediting algorithm.
991          * (c) SMB1 clients are limited to max_mux (50) outstanding
992          *     requests and Windows clients don't use this anyway.
993          * Essentially we want this to be unlimited unless smb.conf
994          * says different.
995          *********************************************************************/
996         aio_pending_size = lp_parm_int(
997                 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
998         return SMB_VFS_NEXT_CONNECT(handle, service, user);
999 }
1000
1001 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1002         .connect_fn = aio_pthread_connect,
1003 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1004         .open_fn = aio_pthread_open_fn,
1005 #endif
1006         .aio_read_fn = aio_pthread_read,
1007         .aio_write_fn = aio_pthread_write,
1008         .aio_return_fn = aio_pthread_return_fn,
1009         .aio_cancel_fn = aio_pthread_cancel,
1010         .aio_error_fn = aio_pthread_error_fn,
1011         .aio_suspend_fn = aio_pthread_suspend,
1012 };
1013
1014 NTSTATUS vfs_aio_pthread_init(void);
1015 NTSTATUS vfs_aio_pthread_init(void)
1016 {
1017         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1018                                 "aio_pthread", &vfs_aio_pthread_fns);
1019 }