2 * Simulate Posix AIO using pthreads.
4 * Based on the aio_fork work from Volker and Volker's pthreadpool library.
6 * Copyright (C) Volker Lendecke 2008
7 * Copyright (C) Jeremy Allison 2012
9 * This program is free software; you can redistribute it and/or modify
10 * it under the terms of the GNU General Public License as published by
11 * the Free Software Foundation; either version 3 of the License, or
12 * (at your option) any later version.
14 * This program is distributed in the hope that it will be useful,
15 * but WITHOUT ANY WARRANTY; without even the implied warranty of
16 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
17 * GNU General Public License for more details.
19 * You should have received a copy of the GNU General Public License
20 * along with this program; if not, write to the Free Software
21 * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
35 struct aio_private_data {
36 struct aio_private_data *prev, *next;
38 SMB_STRUCT_AIOCB *aiocb;
46 /* List of outstanding requests we have. */
47 static struct aio_private_data *pd_list;
49 static void aio_pthread_handle_completion(struct event_context *event_ctx,
50 struct fd_event *event,
55 /************************************************************************
56 Ensure thread pool is initialized.
57 ***********************************************************************/
59 static bool init_aio_threadpool(struct event_context *ev_ctx,
60 struct pthreadpool **pp_pool,
61 void (*completion_fn)(struct event_context *,
66 struct fd_event *sock_event = NULL;
73 ret = pthreadpool_init(aio_pending_size, pp_pool);
78 sock_event = tevent_add_fd(ev_ctx,
80 pthreadpool_signal_fd(*pp_pool),
84 if (sock_event == NULL) {
85 pthreadpool_destroy(*pp_pool);
90 DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
97 /************************************************************************
98 Worker function - core of the pthread aio engine.
99 This is the function that actually does the IO.
100 ***********************************************************************/
102 static void aio_worker(void *private_data)
104 struct aio_private_data *pd =
105 (struct aio_private_data *)private_data;
107 if (pd->write_command) {
108 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
109 (const void *)pd->aiocb->aio_buf,
110 pd->aiocb->aio_nbytes,
111 pd->aiocb->aio_offset);
112 if (pd->ret_size == -1 && errno == ESPIPE) {
113 /* Maintain the fiction that pipes can
114 be seeked (sought?) on. */
115 pd->ret_size = sys_write(pd->aiocb->aio_fildes,
116 (const void *)pd->aiocb->aio_buf,
117 pd->aiocb->aio_nbytes);
119 if (pd->ret_size != -1 && pd->flush_write) {
121 * Optimization - flush if requested.
122 * Ignore error as upper layer will
125 (void)fsync(pd->aiocb->aio_fildes);
128 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
129 (void *)pd->aiocb->aio_buf,
130 pd->aiocb->aio_nbytes,
131 pd->aiocb->aio_offset);
132 if (pd->ret_size == -1 && errno == ESPIPE) {
133 /* Maintain the fiction that pipes can
134 be seeked (sought?) on. */
135 pd->ret_size = sys_read(pd->aiocb->aio_fildes,
136 (void *)pd->aiocb->aio_buf,
137 pd->aiocb->aio_nbytes);
140 if (pd->ret_size == -1) {
141 pd->ret_errno = errno;
147 /************************************************************************
148 Private data destructor.
149 ***********************************************************************/
151 static int pd_destructor(struct aio_private_data *pd)
153 DLIST_REMOVE(pd_list, pd);
157 /************************************************************************
158 Create and initialize a private data struct.
159 ***********************************************************************/
161 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
162 SMB_STRUCT_AIOCB *aiocb)
164 struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
168 pd->jobid = aio_pthread_jobid++;
171 pd->ret_errno = EINPROGRESS;
172 talloc_set_destructor(pd, pd_destructor);
173 DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
177 /************************************************************************
178 Spin off a threadpool (if needed) and initiate a pread call.
179 ***********************************************************************/
181 static int aio_pthread_read(struct vfs_handle_struct *handle,
182 struct files_struct *fsp,
183 SMB_STRUCT_AIOCB *aiocb)
185 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
186 struct aio_private_data *pd = NULL;
189 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
191 aio_pthread_handle_completion)) {
195 pd = create_private_data(aio_ex, aiocb);
197 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
201 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
207 DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
208 "of %llu bytes at offset %llu\n",
210 (unsigned long long)pd->aiocb->aio_nbytes,
211 (unsigned long long)pd->aiocb->aio_offset));
216 /************************************************************************
217 Spin off a threadpool (if needed) and initiate a pwrite call.
218 ***********************************************************************/
220 static int aio_pthread_write(struct vfs_handle_struct *handle,
221 struct files_struct *fsp,
222 SMB_STRUCT_AIOCB *aiocb)
224 struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
225 struct aio_private_data *pd = NULL;
228 if (!init_aio_threadpool(handle->conn->sconn->ev_ctx,
230 aio_pthread_handle_completion)) {
234 pd = create_private_data(aio_ex, aiocb);
236 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
240 pd->write_command = true;
241 if (lp_strict_sync(SNUM(fsp->conn)) &&
242 (lp_syncalways(SNUM(fsp->conn)) ||
243 aio_write_through_requested(aio_ex))) {
244 pd->flush_write = true;
248 ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
254 DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
255 "of %llu bytes at offset %llu\n",
257 (unsigned long long)pd->aiocb->aio_nbytes,
258 (unsigned long long)pd->aiocb->aio_offset));
263 /************************************************************************
264 Find the private data by jobid.
265 ***********************************************************************/
267 static struct aio_private_data *find_private_data_by_jobid(int jobid)
269 struct aio_private_data *pd;
271 for (pd = pd_list; pd != NULL; pd = pd->next) {
272 if (pd->jobid == jobid) {
280 /************************************************************************
281 Callback when an IO completes.
282 ***********************************************************************/
284 static void aio_pthread_handle_completion(struct event_context *event_ctx,
285 struct fd_event *event,
289 struct aio_extra *aio_ex = NULL;
290 struct aio_private_data *pd = NULL;
294 DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
297 if ((flags & EVENT_FD_READ) == 0) {
301 ret = pthreadpool_finished_job(pool, &jobid);
303 smb_panic("aio_pthread_handle_completion");
307 pd = find_private_data_by_jobid(jobid);
309 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
314 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
315 smbd_aio_complete_aio_ex(aio_ex);
317 DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
322 /************************************************************************
323 Find the private data by aiocb.
324 ***********************************************************************/
326 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
328 struct aio_private_data *pd;
330 for (pd = pd_list; pd != NULL; pd = pd->next) {
331 if (pd->aiocb == aiocb) {
339 /************************************************************************
340 Called to return the result of a completed AIO.
341 Should only be called if aio_error returns something other than EINPROGRESS.
343 Any other value - return from IO operation.
344 ***********************************************************************/
346 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
347 struct files_struct *fsp,
348 SMB_STRUCT_AIOCB *aiocb)
350 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
354 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
365 if (pd->ret_size == -1) {
366 errno = pd->ret_errno;
372 /************************************************************************
373 Called to check the result of an AIO.
375 EINPROGRESS - still in progress.
376 EINVAL - invalid aiocb.
377 ECANCELED - request was cancelled.
378 0 - request completed successfully.
379 Any other value - errno from IO operation.
380 ***********************************************************************/
382 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
383 struct files_struct *fsp,
384 SMB_STRUCT_AIOCB *aiocb)
386 struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
394 return pd->ret_errno;
397 /************************************************************************
398 Called to request the cancel of an AIO, or all of them on a specific
399 fsp if aiocb == NULL.
400 ***********************************************************************/
402 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
403 struct files_struct *fsp,
404 SMB_STRUCT_AIOCB *aiocb)
406 struct aio_private_data *pd = NULL;
408 for (pd = pd_list; pd != NULL; pd = pd->next) {
409 if (pd->aiocb == NULL) {
412 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
415 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
420 * We let the child do its job, but we discard the result when
424 pd->cancelled = true;
430 /************************************************************************
431 Callback for a previously detected job completion.
432 ***********************************************************************/
434 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
435 struct tevent_immediate *im,
438 struct aio_extra *aio_ex = NULL;
439 struct aio_private_data *pd = (struct aio_private_data *)private_data;
441 aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
442 smbd_aio_complete_aio_ex(aio_ex);
446 /************************************************************************
447 Private data struct used in suspend completion code.
448 ***********************************************************************/
450 struct suspend_private {
453 const SMB_STRUCT_AIOCB * const *aiocb_array;
456 /************************************************************************
457 Callback when an IO completes from a suspend call.
458 ***********************************************************************/
460 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
461 struct fd_event *event,
465 struct suspend_private *sp = (struct suspend_private *)p;
466 struct aio_private_data *pd = NULL;
467 struct tevent_immediate *im = NULL;
471 DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
474 if ((flags & EVENT_FD_READ) == 0) {
478 if (pthreadpool_finished_job(pool, &jobid)) {
479 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
483 pd = find_private_data_by_jobid(jobid);
485 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
490 /* Is this a jobid with an aiocb we're interested in ? */
491 for (i = 0; i < sp->num_entries; i++) {
492 if (sp->aiocb_array[i] == pd->aiocb) {
498 /* Jobid completed we weren't waiting for.
499 We must reschedule this as an immediate event
500 on the main event context. */
501 im = tevent_create_immediate(NULL);
503 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
506 DEBUG(10,("aio_pthread_handle_suspend_completion: "
507 "re-scheduling job id %d\n",
510 tevent_schedule_immediate(im,
511 server_event_context(),
512 aio_pthread_handle_immediate,
517 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
518 struct tevent_timer *te,
522 bool *timed_out = (bool *)private_data;
523 /* Remove this timed event handler. */
528 /************************************************************************
529 Called to request everything to stop until all IO is completed.
530 ***********************************************************************/
532 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
533 struct files_struct *fsp,
534 const SMB_STRUCT_AIOCB * const aiocb_array[],
536 const struct timespec *timeout)
538 struct event_context *ev = NULL;
539 struct fd_event *sock_event = NULL;
541 struct suspend_private sp;
542 bool timed_out = false;
543 TALLOC_CTX *frame = talloc_stackframe();
545 /* This is a blocking call, and has to use a sub-event loop. */
546 ev = event_context_init(frame);
553 struct timeval tv = convert_timespec_to_timeval(*timeout);
554 struct tevent_timer *te = tevent_add_timer(ev,
556 timeval_current_ofs(tv.tv_sec,
558 aio_pthread_suspend_timed_out,
568 sp.aiocb_array = aiocb_array;
571 sock_event = tevent_add_fd(ev,
573 pthreadpool_signal_fd(pool),
575 aio_pthread_handle_suspend_completion,
577 if (sock_event == NULL) {
578 pthreadpool_destroy(pool);
583 * We're going to cheat here. We know that smbd/aio.c
584 * only calls this when it's waiting for every single
585 * outstanding call to finish on a close, so just wait
586 * individually for each IO to complete. We don't care
587 * what order they finish - only that they all do. JRA.
589 while (sp.num_entries != sp.num_finished) {
590 if (tevent_loop_once(ev) == -1) {
608 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
610 * We must have openat() to do any thread-based
611 * asynchronous opens. We also must be using
612 * thread-specific credentials (Linux-only
617 * NB. This threadpool is shared over all
618 * instances of this VFS module in this
619 * process, as is the current jobid.
622 static struct pthreadpool *open_pool;
623 static int aio_pthread_open_jobid;
625 struct aio_open_private_data {
626 struct aio_open_private_data *prev, *next;
636 struct smbd_server_connection *sconn;
637 const struct security_unix_token *ux_tok;
643 /* List of outstanding requests we have. */
644 static struct aio_open_private_data *open_pd_list;
646 /************************************************************************
647 Find the open private data by jobid.
648 ***********************************************************************/
650 static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
652 struct aio_open_private_data *opd;
654 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
655 if (opd->jobid == jobid) {
663 /************************************************************************
664 Find the open private data by mid.
665 ***********************************************************************/
667 static struct aio_open_private_data *find_open_private_data_by_mid(uint64_t mid)
669 struct aio_open_private_data *opd;
671 for (opd = open_pd_list; opd != NULL; opd = opd->next) {
672 if (opd->mid == mid) {
680 /************************************************************************
681 Callback when an open completes.
682 ***********************************************************************/
684 static void aio_open_handle_completion(struct event_context *event_ctx,
685 struct fd_event *event,
689 struct aio_open_private_data *opd = NULL;
693 DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
696 if ((flags & EVENT_FD_READ) == 0) {
700 ret = pthreadpool_finished_job(open_pool, &jobid);
702 smb_panic("aio_open_handle_completion");
707 opd = find_open_private_data_by_jobid(jobid);
709 DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
711 smb_panic("aio_open_handle_completion - no jobid");
716 DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
717 "for file %s/%s completed\n",
719 (unsigned long long)opd->mid,
723 opd->in_progress = false;
725 /* Find outstanding event and reschdule. */
726 if (!schedule_deferred_open_message_smb(opd->sconn, opd->mid)) {
728 * Outstanding event didn't exist or was
729 * cancelled. Free up the fd and throw
732 if (opd->ret_fd != -1) {
740 /*****************************************************************
741 The core of the async open code - the worker function. Note we
742 use the new openat() system call to avoid any problems with
743 current working directory changes plus we change credentials
744 on the thread to prevent any security race conditions.
745 *****************************************************************/
747 static void aio_open_worker(void *private_data)
749 struct aio_open_private_data *opd =
750 (struct aio_open_private_data *)private_data;
752 /* Become the correct credential on this thread. */
753 if (set_thread_credentials(opd->ux_tok->uid,
755 (size_t)opd->ux_tok->ngroups,
756 opd->ux_tok->groups) != 0) {
758 opd->ret_errno = errno;
762 opd->ret_fd = openat(opd->dir_fd,
767 if (opd->ret_fd == -1) {
768 opd->ret_errno = errno;
770 /* Create was successful. */
775 /************************************************************************
776 Open private data destructor.
777 ***********************************************************************/
779 static int opd_destructor(struct aio_open_private_data *opd)
781 if (opd->dir_fd != -1) {
784 DLIST_REMOVE(open_pd_list, opd);
788 /************************************************************************
789 Create and initialize a private data struct for async open.
790 ***********************************************************************/
792 static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
796 struct aio_open_private_data *opd = talloc_zero(NULL,
797 struct aio_open_private_data);
798 const char *fname = NULL;
804 opd->jobid = aio_pthread_open_jobid++;
807 opd->ret_errno = EINPROGRESS;
811 opd->in_progress = true;
812 opd->sconn = fsp->conn->sconn;
814 /* Copy our current credentials. */
815 opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
816 if (opd->ux_tok == NULL) {
822 * Copy the parent directory name and the
823 * relative path within it.
825 if (parent_dirname(opd,
826 fsp->fsp_name->base_name,
832 opd->fname = talloc_strdup(opd, fname);
833 if (opd->fname == NULL) {
838 #if defined(O_DIRECTORY)
839 opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
841 opd->dir_fd = open(opd->dname, O_RDONLY);
843 if (opd->dir_fd == -1) {
848 talloc_set_destructor(opd, opd_destructor);
849 DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
853 /*****************************************************************
855 *****************************************************************/
857 static int open_async(const files_struct *fsp,
861 struct aio_open_private_data *opd = NULL;
864 if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
866 aio_open_handle_completion)) {
870 opd = create_private_open_data(fsp, flags, mode);
872 DEBUG(10, ("open_async: Could not create private data.\n"));
876 ret = pthreadpool_add_job(open_pool,
885 DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
886 (unsigned long long)opd->mid,
891 /* Cause the calling code to reschedule us. */
892 errno = EINTR; /* Maps to NT_STATUS_RETRY. */
896 /*****************************************************************
897 Look for a matching SMB2 mid. If we find it we're rescheduled,
898 just return the completed open.
899 *****************************************************************/
901 static bool find_completed_open(files_struct *fsp,
905 struct aio_open_private_data *opd;
907 opd = find_open_private_data_by_mid(fsp->mid);
912 if (opd->in_progress) {
913 DEBUG(0,("find_completed_open: mid %llu "
914 "jobid %d still in progress for "
915 "file %s/%s. PANIC !\n",
916 (unsigned long long)opd->mid,
920 /* Disaster ! This is an open timeout. Just panic. */
921 smb_panic("find_completed_open - in_progress\n");
927 *p_errno = opd->ret_errno;
929 DEBUG(5,("find_completed_open: mid %llu returning "
930 "fd = %d, errno = %d (%s) "
931 "jobid (%d) for file %s\n",
932 (unsigned long long)opd->mid,
935 strerror(opd->ret_errno),
937 smb_fname_str_dbg(fsp->fsp_name)));
939 /* Now we can free the opd. */
944 /*****************************************************************
945 The core open function. Only go async on O_CREAT|O_EXCL
946 opens to prevent any race conditions.
947 *****************************************************************/
949 static int aio_pthread_open_fn(vfs_handle_struct *handle,
950 struct smb_filename *smb_fname,
957 bool aio_allow_open = lp_parm_bool(
958 SNUM(handle->conn), "aio_pthread", "aio open", false);
960 if (smb_fname->stream_name) {
961 /* Don't handle stream opens. */
966 if (!aio_allow_open) {
967 /* aio opens turned off. */
968 return open(smb_fname->base_name, flags, mode);
971 if (!(flags & O_CREAT)) {
972 /* Only creates matter. */
973 return open(smb_fname->base_name, flags, mode);
976 if (!(flags & O_EXCL)) {
977 /* Only creates with O_EXCL matter. */
978 return open(smb_fname->base_name, flags, mode);
982 * See if this is a reentrant call - i.e. is this a
983 * restart of an existing open that just completed.
986 if (find_completed_open(fsp,
993 /* Ok, it's a create exclusive call - pass it to a thread helper. */
994 return open_async(fsp, flags, mode);
998 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
1001 /*********************************************************************
1002 * How many threads to initialize ?
1003 * 100 per process seems insane as a default until you realize that
1004 * (a) Threads terminate after 1 second when idle.
1005 * (b) Throttling is done in SMB2 via the crediting algorithm.
1006 * (c) SMB1 clients are limited to max_mux (50) outstanding
1007 * requests and Windows clients don't use this anyway.
1008 * Essentially we want this to be unlimited unless smb.conf
1010 *********************************************************************/
1011 aio_pending_size = lp_parm_int(
1012 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
1013 return SMB_VFS_NEXT_CONNECT(handle, service, user);
1016 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
1017 .connect_fn = aio_pthread_connect,
1018 #if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
1019 .open_fn = aio_pthread_open_fn,
1021 .aio_read_fn = aio_pthread_read,
1022 .aio_write_fn = aio_pthread_write,
1023 .aio_return_fn = aio_pthread_return_fn,
1024 .aio_cancel_fn = aio_pthread_cancel,
1025 .aio_error_fn = aio_pthread_error_fn,
1026 .aio_suspend_fn = aio_pthread_suspend,
1029 NTSTATUS vfs_aio_pthread_init(void);
1030 NTSTATUS vfs_aio_pthread_init(void)
1032 return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
1033 "aio_pthread", &vfs_aio_pthread_fns);