2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
35 struct aio_private_data {
36 struct aio_private_data *prev, *next;
38 SMB_STRUCT_AIOCB *aiocb;
45 /* List of outstanding requests we have. */
46 static struct aio_private_data *pd_list;
48 static void aio_pthread_handle_completion(struct event_context *event_ctx,
49 struct fd_event *event,
54 /************************************************************************
55 Ensure thread pool is initialized.
56 ***********************************************************************/
58 static bool init_aio_threadpool(struct event_context *ev_ctx,
59 struct pthreadpool **pp_pool,
60 void (*completion_fn)(struct event_context *,
65 struct fd_event *sock_event = NULL;
72 ret = pthreadpool_init(aio_pending_size, pp_pool);
77 sock_event = tevent_add_fd(ev_ctx,
79 pthreadpool_signal_fd(*pp_pool),
83 if (sock_event == NULL) {
84 pthreadpool_destroy(*pp_pool);
89 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
96 /************************************************************************
97 Worker function - core of the pthread aio engine.
98 This is the function that actually does the IO.
99 ***********************************************************************/
101 static void aio_worker(void *private_data)
103 struct aio_private_data *pd =
104 (struct aio_private_data *)private_data;
106 if (pd->write_command) {
107 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
108 (const void *)pd->aiocb->aio_buf,
109 pd->aiocb->aio_nbytes,
110 pd->aiocb->aio_offset);
111 if (pd->ret_size == -1 && errno == ESPIPE) {
112 /* Maintain the fiction that pipes can
113 be seeked (sought?) on. */
114 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
115 (const void *)pd->aiocb->aio_buf,
116 pd->aiocb->aio_nbytes);
119 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
120 (void *)pd->aiocb->aio_buf,
121 pd->aiocb->aio_nbytes,
122 pd->aiocb->aio_offset);
123 if (pd->ret_size == -1 && errno == ESPIPE) {
124 /* Maintain the fiction that pipes can
125 be seeked (sought?) on. */
126 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
127 (void *)pd->aiocb->aio_buf,
128 pd->aiocb->aio_nbytes);
131 if (pd->ret_size == -1) {
132 pd->ret_errno = errno;
138 /************************************************************************
139 Private data destructor.
140 ***********************************************************************/
142 static int pd_destructor(struct aio_private_data *pd)
144 DLIST_REMOVE(pd_list, pd);
148 /************************************************************************
149 Create and initialize a private data struct.
150 ***********************************************************************/
152 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
153 SMB_STRUCT_AIOCB *aiocb)
155 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
159 pd->jobid = aio_pthread_jobid++;
162 pd->ret_errno = EINPROGRESS;
163 talloc_set_destructor(pd, pd_destructor);
164 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
168 /************************************************************************
169 Spin off a threadpool (if needed) and initiate a pread call.
170 ***********************************************************************/
172 static int aio_pthread_read(struct vfs_handle_struct *handle,
173 struct files_struct *fsp,
174 SMB_STRUCT_AIOCB *aiocb)
176 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
177 struct aio_private_data *pd = NULL;
180 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
182 aio_pthread_handle_completion)) {
186 pd = create_private_data(aio_ex, aiocb);
188 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
192 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
198 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
199 "of %llu bytes at offset %llu\n",
201 (unsigned long long)pd->aiocb->aio_nbytes,
202 (unsigned long long)pd->aiocb->aio_offset));
207 /************************************************************************
208 Spin off a threadpool (if needed) and initiate a pwrite call.
209 ***********************************************************************/
211 static int aio_pthread_write(struct vfs_handle_struct *handle,
212 struct files_struct *fsp,
213 SMB_STRUCT_AIOCB *aiocb)
215 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
216 struct aio_private_data *pd = NULL;
219 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
221 aio_pthread_handle_completion)) {
225 pd = create_private_data(aio_ex, aiocb);
227 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
231 pd->write_command = true;
233 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
239 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
240 "of %llu bytes at offset %llu\n",
242 (unsigned long long)pd->aiocb->aio_nbytes,
243 (unsigned long long)pd->aiocb->aio_offset));
248 /************************************************************************
249 Find the private data by jobid.
250 ***********************************************************************/
252 static struct aio_private_data *find_private_data_by_jobid(int jobid)
254 struct aio_private_data *pd;
256 for (pd = pd_list; pd != NULL; pd = pd->next) {
257 if (pd->jobid == jobid) {
265 /************************************************************************
266 Callback when an IO completes.
267 ***********************************************************************/
269 static void aio_pthread_handle_completion(struct event_context *event_ctx,
270 struct fd_event *event,
274 struct aio_extra *aio_ex = NULL;
275 struct aio_private_data *pd = NULL;
279 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
282 if ((flags & EVENT_FD_READ) == 0) {
286 ret = pthreadpool_finished_job(pool, &jobid);
288 smb_panic("aio_pthread_handle_completion");
292 pd = find_private_data_by_jobid(jobid);
294 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
299 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
300 smbd_aio_complete_aio_ex(aio_ex);
302 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
307 /************************************************************************
308 Find the private data by aiocb.
309 ***********************************************************************/
311 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
313 struct aio_private_data *pd;
315 for (pd = pd_list; pd != NULL; pd = pd->next) {
316 if (pd->aiocb == aiocb) {
324 /************************************************************************
325 Called to return the result of a completed AIO.
326 Should only be called if aio_error returns something other than EINPROGRESS.
328 Any other value - return from IO operation.
329 ***********************************************************************/
331 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
332 struct files_struct *fsp,
333 SMB_STRUCT_AIOCB *aiocb)
335 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
339 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
350 if (pd->ret_size == -1) {
351 errno = pd->ret_errno;
357 /************************************************************************
358 Called to check the result of an AIO.
360 EINPROGRESS - still in progress.
361 EINVAL - invalid aiocb.
362 ECANCELED - request was cancelled.
363 0 - request completed successfully.
364 Any other value - errno from IO operation.
365 ***********************************************************************/
367 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
368 struct files_struct *fsp,
369 SMB_STRUCT_AIOCB *aiocb)
371 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
379 return pd->ret_errno;
382 /************************************************************************
383 Called to request the cancel of an AIO, or all of them on a specific
384 fsp if aiocb == NULL.
385 ***********************************************************************/
387 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
388 struct files_struct *fsp,
389 SMB_STRUCT_AIOCB *aiocb)
391 struct aio_private_data *pd = NULL;
393 for (pd = pd_list; pd != NULL; pd = pd->next) {
394 if (pd->aiocb == NULL) {
397 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
400 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
405 * We let the child do its job, but we discard the result when
409 pd->cancelled = true;
415 /************************************************************************
416 Callback for a previously detected job completion.
417 ***********************************************************************/
419 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
420 struct tevent_immediate *im,
423 struct aio_extra *aio_ex = NULL;
424 struct aio_private_data *pd = (struct aio_private_data *)private_data;
426 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
427 smbd_aio_complete_aio_ex(aio_ex);
431 /************************************************************************
432 Private data struct used in suspend completion code.
433 ***********************************************************************/
435 struct suspend_private {
438 const SMB_STRUCT_AIOCB * const *aiocb_array;
441 /************************************************************************
442 Callback when an IO completes from a suspend call.
443 ***********************************************************************/
445 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
446 struct fd_event *event,
450 struct suspend_private *sp = (struct suspend_private *)p;
451 struct aio_private_data *pd = NULL;
452 struct tevent_immediate *im = NULL;
456 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
459 if ((flags & EVENT_FD_READ) == 0) {
463 if (pthreadpool_finished_job(pool, &jobid)) {
464 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
468 pd = find_private_data_by_jobid(jobid);
470 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
475 /* Is this a jobid with an aiocb we're interested in ? */
476 for (i = 0; i < sp->num_entries; i++) {
477 if (sp->aiocb_array[i] == pd->aiocb) {
483 /* Jobid completed we weren't waiting for.
484 We must reschedule this as an immediate event
485 on the main event context. */
486 im = tevent_create_immediate(NULL);
488 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
491 DEBUG(10,("aio_pthread_handle_suspend_completion: "
492 "re-scheduling job id %d\n",
495 tevent_schedule_immediate(im,
496 server_event_context(),
497 aio_pthread_handle_immediate,
502 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
503 struct tevent_timer *te,
507 bool *timed_out = (bool *)private_data;
508 /* Remove this timed event handler. */
513 /************************************************************************
514 Called to request everything to stop until all IO is completed.
515 ***********************************************************************/
517 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
518 struct files_struct *fsp,
519 const SMB_STRUCT_AIOCB * const aiocb_array[],
521 const struct timespec *timeout)
523 struct event_context *ev = NULL;
524 struct fd_event *sock_event = NULL;
526 struct suspend_private sp;
527 bool timed_out = false;
528 TALLOC_CTX *frame = talloc_stackframe();
530 /* This is a blocking call, and has to use a sub-event loop. */
531 ev = event_context_init(frame);
538 struct timeval tv = convert_timespec_to_timeval(*timeout);
539 struct tevent_timer *te = tevent_add_timer(ev,
541 timeval_current_ofs(tv.tv_sec,
543 aio_pthread_suspend_timed_out,
553 sp.aiocb_array = aiocb_array;
556 sock_event = tevent_add_fd(ev,
558 pthreadpool_signal_fd(pool),
560 aio_pthread_handle_suspend_completion,
562 if (sock_event == NULL) {
563 pthreadpool_destroy(pool);
568 * We're going to cheat here. We know that smbd/aio.c
569 * only calls this when it's waiting for every single
570 * outstanding call to finish on a close, so just wait
571 * individually for each IO to complete. We don't care
572 * what order they finish - only that they all do. JRA.
574 while (sp.num_entries != sp.num_finished) {
575 if (tevent_loop_once(ev) == -1) {
593 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
595 * We must have openat() to do any thread-based
596 * asynchronous opens. We also must be using
597 * thread-specific credentials (Linux-only
602 * NB. This threadpool is shared over all
603 * instances of this VFS module in this
604 * process, as is the current jobid.
607 static struct pthreadpool *open_pool;
608 static int aio_pthread_open_jobid;
610 struct aio_open_private_data {
611 struct aio_open_private_data *prev, *next;
621 struct smbd_server_connection *sconn;
622 const struct security_unix_token *ux_tok;
628 /* List of outstanding requests we have. */
629 static struct aio_open_private_data *open_pd_list;
631 /************************************************************************
632 Find the open private data by jobid.
633 ***********************************************************************/
635 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
637 struct aio_open_private_data *opd;
639 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
640 if (opd->jobid == jobid) {
648 /************************************************************************
649 Find the open private data by mid.
650 ***********************************************************************/
652 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
654 struct aio_open_private_data *opd;
656 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
657 if (opd->mid == mid) {
665 /************************************************************************
666 Callback when an open completes.
667 ***********************************************************************/
669 static void aio_open_handle_completion(struct event_context *event_ctx,
670 struct fd_event *event,
674 struct aio_open_private_data *opd = NULL;
678 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
681 if ((flags & EVENT_FD_READ) == 0) {
685 ret = pthreadpool_finished_job(open_pool, &jobid);
687 smb_panic("aio_open_handle_completion");
692 opd = find_open_private_data_by_jobid(jobid);
694 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
696 smb_panic("aio_open_handle_completion - no jobid");
701 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
702 "for file %s/%s completed\n",
704 (unsigned long long)opd->mid,
708 opd->in_progress = false;
710 /* Find outstanding event and reschdule. */
711 if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
713 * Outstanding event didn't exist or was
714 * cancelled. Free up the fd and throw
717 if (opd->ret_fd != -1) {
725 /*****************************************************************
726 The core of the async open code - the worker function. Note we
727 use the new openat() system call to avoid any problems with
728 current working directory changes plus we change credentials
729 on the thread to prevent any security race conditions.
730 *****************************************************************/
732 static void aio_open_worker(void *private_data)
734 struct aio_open_private_data *opd =
735 (struct aio_open_private_data *)private_data;
737 /* Become the correct credential on this thread. */
738 if (set_thread_credentials(opd->ux_tok->uid,
740 (size_t)opd->ux_tok->ngroups,
741 opd->ux_tok->groups) != 0) {
743 opd->ret_errno = errno;
747 opd->ret_fd = openat(opd->dir_fd,
752 if (opd->ret_fd == -1) {
753 opd->ret_errno = errno;
755 /* Create was successful. */
760 /************************************************************************
761 Open private data destructor.
762 ***********************************************************************/
764 static int opd_destructor(struct aio_open_private_data *opd)
766 if (opd->dir_fd != -1) {
769 DLIST_REMOVE(open_pd_list, opd);
773 /************************************************************************
774 Create and initialize a private data struct for async open.
775 ***********************************************************************/
777 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
781 struct aio_open_private_data *opd = talloc_zero(NULL,
782 struct aio_open_private_data);
783 const char *fname = NULL;
789 opd->jobid = aio_pthread_open_jobid++;
792 opd->ret_errno = EINPROGRESS;
796 opd->in_progress = true;
797 opd->sconn = fsp->conn->sconn;
799 /* Copy our current credentials. */
800 opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
801 if (opd->ux_tok == NULL) {
807 * Copy the parent directory name and the
808 * relative path within it.
810 if (parent_dirname(opd,
811 fsp->fsp_name->base_name,
817 opd->fname = talloc_strdup(opd, fname);
818 if (opd->fname == NULL) {
823 #if defined(O_DIRECTORY)
824 opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
826 opd->dir_fd = open(opd->dname, O_RDONLY);
828 if (opd->dir_fd == -1) {
833 talloc_set_destructor(opd, opd_destructor);
834 DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
838 /*****************************************************************
840 *****************************************************************/
842 static int open_async(const files_struct *fsp,
846 struct aio_open_private_data *opd = NULL;
849 if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
851 aio_open_handle_completion)) {
855 opd = create_private_open_data(fsp, flags, mode);
857 DEBUG(10, ("open_async: Could not create private data.\n"));
861 ret = pthreadpool_add_job(open_pool,
870 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
871 (unsigned long long)opd->mid,
876 /* Cause the calling code to reschedule us. */
877 errno = EINTR; /* Maps to NT_STATUS_RETRY. */
881 /*****************************************************************
882 Look for a matching SMB2 mid. If we find it we're rescheduled,
883 just return the completed open.
884 *****************************************************************/
886 static bool find_completed_open(files_struct *fsp,
890 struct aio_open_private_data *opd;
892 opd = find_open_private_data_by_mid(fsp->mid);
897 if (opd->in_progress) {
898 DEBUG(0,("find_completed_open: mid %llu "
899 "jobid %d still in progress for "
900 "file %s/%s. PANIC !\n",
901 (unsigned long long)opd->mid,
905 /* Disaster ! This is an open timeout. Just panic. */
906 smb_panic("find_completed_open - in_progress\n");
912 *p_errno = opd->ret_errno;
914 DEBUG(5,("find_completed_open: mid %llu returning "
915 "fd = %d, errno = %d (%s) "
916 "jobid (%d) for file %s\n",
917 (unsigned long long)opd->mid,
920 strerror(opd->ret_errno),
922 smb_fname_str_dbg(fsp->fsp_name)));
924 /* Now we can free the opd. */
929 /*****************************************************************
930 The core open function. Only go async on O_CREAT|O_EXCL
931 opens to prevent any race conditions.
932 *****************************************************************/
934 static int aio_pthread_open_fn(vfs_handle_struct *handle,
935 struct smb_filename *smb_fname,
942 bool aio_allow_open = lp_parm_bool(
943 SNUM(handle->conn), "aio_pthread", "aio open", false);
945 if (smb_fname->stream_name) {
946 /* Don't handle stream opens. */
951 if (!aio_allow_open) {
952 /* aio opens turned off. */
953 return open(smb_fname->base_name, flags, mode);
956 if (!(flags & O_CREAT)) {
957 /* Only creates matter. */
958 return open(smb_fname->base_name, flags, mode);
961 if (!(flags & O_EXCL)) {
962 /* Only creates with O_EXCL matter. */
963 return open(smb_fname->base_name, flags, mode);
967 * See if this is a reentrant call - i.e. is this a
968 * restart of an existing open that just completed.
971 if (find_completed_open(fsp,
978 /* Ok, it's a create exclusive call - pass it to a thread helper. */
979 return open_async(fsp, flags, mode);
983 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
986 /*********************************************************************
987 * How many threads to initialize ?
988 * 100 per process seems insane as a default until you realize that
989 * (a) Threads terminate after 1 second when idle.
990 * (b) Throttling is done in SMB2 via the crediting algorithm.
991 * (c) SMB1 clients are limited to max_mux (50) outstanding
992 * requests and Windows clients don't use this anyway.
993 * Essentially we want this to be unlimited unless smb.conf
995 *********************************************************************/
996 aio_pending_size = lp_parm_int(
997 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
998 return SMB_VFS_NEXT_CONNECT(handle, service, user);
1001 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1002 .connect_fn = aio_pthread_connect,
1003 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1004 .open_fn = aio_pthread_open_fn,
1006 .aio_read_fn = aio_pthread_read,
1007 .aio_write_fn = aio_pthread_write,
1008 .aio_return_fn = aio_pthread_return_fn,
1009 .aio_cancel_fn = aio_pthread_cancel,
1010 .aio_error_fn = aio_pthread_error_fn,
1011 .aio_suspend_fn = aio_pthread_suspend,
1014 NTSTATUS vfs_aio_pthread_init(void);
1015 NTSTATUS vfs_aio_pthread_init(void)
1017 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1018 "aio_pthread", &vfs_aio_pthread_fns);