#include "system/shmem.h"
#include "smbd/smbd.h"
#include "smbd/globals.h"
-#include "lib/pthreadpool/pthreadpool.h"
+#include "../lib/pthreadpool/pthreadpool_tevent.h"
#ifdef HAVE_LINUX_FALLOC_H
#include <linux/falloc.h>
#endif
-#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
-
-/************************************************************************
- Ensure thread pool is initialized.
-***********************************************************************/
-
-static bool init_aio_threadpool(struct tevent_context *ev_ctx,
- struct pthreadpool **pp_pool,
- void (*completion_fn)(struct tevent_context *,
- struct tevent_fd *,
- uint16_t,
- void *))
-{
- struct tevent_fd *sock_event = NULL;
- int ret = 0;
-
- if (*pp_pool) {
- return true;
- }
-
- ret = pthreadpool_init(lp_aio_max_threads(), pp_pool);
- if (ret) {
- errno = ret;
- return false;
- }
- sock_event = tevent_add_fd(ev_ctx,
- NULL,
- pthreadpool_signal_fd(*pp_pool),
- TEVENT_FD_READ,
- completion_fn,
- NULL);
- if (sock_event == NULL) {
- pthreadpool_destroy(*pp_pool);
- *pp_pool = NULL;
- return false;
- }
-
- DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
- (int)lp_aio_max_threads()));
-
- return true;
-}
+#if defined(HAVE_OPENAT) && defined(HAVE_LINUX_THREAD_CREDENTIALS)
/*
* We must have openat() to do any thread-based
* for now).
*/
-/*
- * NB. This threadpool is shared over all
- * instances of this VFS module in this
- * process, as is the current jobid.
- */
-
-static struct pthreadpool *open_pool;
-static int aio_pthread_open_jobid;
-
struct aio_open_private_data {
struct aio_open_private_data *prev, *next;
/* Inputs. */
- int jobid;
int dir_fd;
+ bool opened_dir_fd;
int flags;
mode_t mode;
uint64_t mid;
bool in_progress;
- const char *fname;
- char *dname;
- struct smbd_server_connection *sconn;
+ struct smb_filename *fsp_name;
+ struct smb_filename *smb_fname;
+ connection_struct *conn;
+ struct smbXsrv_connection *xconn;
const struct security_unix_token *ux_tok;
uint64_t initial_allocation_size;
/* Returns. */
/* List of outstanding requests we have. */
static struct aio_open_private_data *open_pd_list;
-/************************************************************************
- Find the open private data by jobid.
-***********************************************************************/
-
-static struct aio_open_private_data *find_open_private_data_by_jobid(int jobid)
-{
- struct aio_open_private_data *opd;
-
- for (opd = open_pd_list; opd != NULL; opd = opd->next) {
- if (opd->jobid == jobid) {
- return opd;
- }
- }
-
- return NULL;
-}
+static void aio_open_do(struct aio_open_private_data *opd);
+static void opd_free(struct aio_open_private_data *opd);
/************************************************************************
Find the open private data by mid.
Callback when an open completes.
***********************************************************************/
-static void aio_open_handle_completion(struct tevent_context *event_ctx,
- struct tevent_fd *event,
- uint16_t flags,
- void *p)
+static void aio_open_handle_completion(struct tevent_req *subreq)
{
- struct aio_open_private_data *opd = NULL;
- int jobid = 0;
+ struct aio_open_private_data *opd =
+ tevent_req_callback_data(subreq,
+ struct aio_open_private_data);
int ret;
- struct smbXsrv_connection *xconn;
- DEBUG(10, ("aio_open_handle_completion called with flags=%d\n",
- (int)flags));
+ ret = pthreadpool_tevent_job_recv(subreq);
+ TALLOC_FREE(subreq);
- if ((flags & TEVENT_FD_READ) == 0) {
- return;
- }
+ /*
+ * We're no longer in flight. Remove the
+ * destructor used to preserve opd so
+ * a talloc_free actually removes it.
+ */
+ talloc_set_destructor(opd, NULL);
- ret = pthreadpool_finished_jobs(open_pool, &jobid, 1);
- if (ret != 1) {
- smb_panic("aio_open_handle_completion");
- /* notreached. */
+ if (opd->conn == NULL) {
+ /*
+ * We were shutdown closed in flight. No one
+ * wants the result, and state has been reparented
+ * to the NULL context, so just free it so we
+ * don't leak memory.
+ */
+ DBG_NOTICE("aio open request for %s abandoned in flight\n",
+ opd->fsp_name->base_name);
+ if (opd->ret_fd != -1) {
+ close(opd->ret_fd);
+ opd->ret_fd = -1;
+ }
+ /*
+ * Find outstanding event and reschedule so the client
+ * gets an error message return from the open.
+ */
+ schedule_deferred_open_message_smb(opd->xconn, opd->mid);
+ opd_free(opd);
return;
}
- opd = find_open_private_data_by_jobid(jobid);
- if (opd == NULL) {
- DEBUG(0, ("aio_open_handle_completion cannot find jobid %d\n",
- jobid));
- smb_panic("aio_open_handle_completion - no jobid");
- /* notreached. */
- return;
+ if (ret != 0) {
+ bool ok;
+
+ if (ret != EAGAIN) {
+ smb_panic("aio_open_handle_completion");
+ /* notreached. */
+ return;
+ }
+ /*
+ * Make sure we run as the user again
+ */
+ ok = change_to_user_and_service(opd->conn, opd->conn->vuid);
+ if (!ok) {
+ smb_panic("Can't change to user");
+ return;
+ }
+ /*
+ * If we get EAGAIN from pthreadpool_tevent_job_recv() this
+ * means the lower level pthreadpool failed to create a new
+ * thread. Fallback to sync processing in that case to allow
+ * some progress for the client.
+ */
+ aio_open_do(opd);
}
- DEBUG(10,("aio_open_handle_completion: jobid %d mid %llu "
- "for file %s/%s completed\n",
- jobid,
+ DEBUG(10,("aio_open_handle_completion: mid %llu "
+ "for file %s completed\n",
(unsigned long long)opd->mid,
- opd->dname,
- opd->fname));
+ opd->fsp_name->base_name));
opd->in_progress = false;
- /*
- * TODO: In future we need a proper algorithm
- * to find the correct connection for a fsp.
- * For now we only have one connection, so this is correct...
- */
- xconn = opd->sconn->client->connections;
-
/* Find outstanding event and reschedule. */
- if (!schedule_deferred_open_message_smb(xconn, opd->mid)) {
+ if (!schedule_deferred_open_message_smb(opd->xconn, opd->mid)) {
/*
* Outstanding event didn't exist or was
* cancelled. Free up the fd and throw
close(opd->ret_fd);
opd->ret_fd = -1;
}
- TALLOC_FREE(opd);
+ opd_free(opd);
}
}
return;
}
+ aio_open_do(opd);
+}
+
+static void aio_open_do(struct aio_open_private_data *opd)
+{
opd->ret_fd = openat(opd->dir_fd,
- opd->fname,
+ opd->smb_fname->base_name,
opd->flags,
opd->mode);
}
/************************************************************************
- Open private data destructor.
+ Open private data teardown.
***********************************************************************/
-static int opd_destructor(struct aio_open_private_data *opd)
+static void opd_free(struct aio_open_private_data *opd)
{
- if (opd->dir_fd != -1) {
+ if (opd->opened_dir_fd && opd->dir_fd != -1) {
close(opd->dir_fd);
}
DLIST_REMOVE(open_pd_list, opd);
- return 0;
+ TALLOC_FREE(opd);
}
/************************************************************************
Create and initialize a private data struct for async open.
***********************************************************************/
-static struct aio_open_private_data *create_private_open_data(const files_struct *fsp,
- int flags,
- mode_t mode)
+static struct aio_open_private_data *create_private_open_data(
+ TALLOC_CTX *ctx,
+ const struct files_struct *dirfsp,
+ const struct smb_filename *smb_fname,
+ const files_struct *fsp,
+ int flags,
+ mode_t mode)
{
- struct aio_open_private_data *opd = talloc_zero(NULL,
+ struct aio_open_private_data *opd = talloc_zero(ctx,
struct aio_open_private_data);
- const char *fname = NULL;
if (!opd) {
return NULL;
}
- opd->jobid = aio_pthread_open_jobid++;
- opd->dir_fd = -1;
- opd->ret_fd = -1;
- opd->ret_errno = EINPROGRESS;
- opd->flags = flags;
- opd->mode = mode;
- opd->mid = fsp->mid;
- opd->in_progress = true;
- opd->sconn = fsp->conn->sconn;
- opd->initial_allocation_size = fsp->initial_allocation_size;
+ *opd = (struct aio_open_private_data) {
+ .dir_fd = -1,
+ .ret_fd = -1,
+ .ret_errno = EINPROGRESS,
+ .flags = flags,
+ .mode = mode,
+ .mid = fsp->mid,
+ .in_progress = true,
+ .conn = fsp->conn,
+ /*
+ * TODO: In future we need a proper algorithm
+ * to find the correct connection for a fsp.
+ * For now we only have one connection, so this is correct...
+ */
+ .xconn = fsp->conn->sconn->client->connections,
+ .initial_allocation_size = fsp->initial_allocation_size,
+ };
/* Copy our current credentials. */
opd->ux_tok = copy_unix_token(opd, get_current_utok(fsp->conn));
if (opd->ux_tok == NULL) {
- TALLOC_FREE(opd);
+ opd_free(opd);
return NULL;
}
/*
- * Copy the parent directory name and the
- * relative path within it.
+ * Copy the full fsp_name and smb_fname which is the basename.
*/
- if (parent_dirname(opd,
- fsp->fsp_name->base_name,
- &opd->dname,
- &fname) == false) {
- TALLOC_FREE(opd);
+ opd->smb_fname = cp_smb_filename(opd, smb_fname);
+ if (opd->smb_fname == NULL) {
+ opd_free(opd);
return NULL;
}
- opd->fname = talloc_strdup(opd, fname);
- if (opd->fname == NULL) {
- TALLOC_FREE(opd);
+
+ opd->fsp_name = cp_smb_filename(opd, fsp->fsp_name);
+ if (opd->fsp_name == NULL) {
+ opd_free(opd);
return NULL;
}
+ if (fsp_get_pathref_fd(dirfsp) != AT_FDCWD) {
+ opd->dir_fd = fsp_get_pathref_fd(dirfsp);
+ } else {
#if defined(O_DIRECTORY)
- opd->dir_fd = open(opd->dname, O_RDONLY|O_DIRECTORY);
+ opd->dir_fd = open(".", O_RDONLY|O_DIRECTORY);
#else
- opd->dir_fd = open(opd->dname, O_RDONLY);
+ opd->dir_fd = open(".", O_RDONLY);
#endif
+ opd->opened_dir_fd = true;
+ }
if (opd->dir_fd == -1) {
- TALLOC_FREE(opd);
+ opd_free(opd);
return NULL;
}
- talloc_set_destructor(opd, opd_destructor);
- DLIST_ADD_END(open_pd_list, opd, struct aio_open_private_data *);
+ DLIST_ADD_END(open_pd_list, opd);
return opd;
}
+static int opd_inflight_destructor(struct aio_open_private_data *opd)
+{
+ /*
+ * Setting conn to NULL allows us to
+ * discover the connection was torn
+ * down which kills the fsp that owns
+ * opd.
+ */
+ DBG_NOTICE("aio open request for %s cancelled\n",
+ opd->fsp_name->base_name);
+ opd->conn = NULL;
+ /* Don't let opd go away. */
+ return -1;
+}
+
/*****************************************************************
Setup an async open.
*****************************************************************/
-static int open_async(const files_struct *fsp,
- int flags,
- mode_t mode)
+static int open_async(const struct files_struct *dirfsp,
+ const struct smb_filename *smb_fname,
+ const files_struct *fsp,
+ int flags,
+ mode_t mode)
{
struct aio_open_private_data *opd = NULL;
- int ret;
+ struct tevent_req *subreq = NULL;
- if (!init_aio_threadpool(fsp->conn->sconn->ev_ctx,
- &open_pool,
- aio_open_handle_completion)) {
- return -1;
- }
-
- opd = create_private_open_data(fsp, flags, mode);
+ /*
+ * Allocate off fsp->conn, not NULL or fsp. As we're going
+ * async fsp will get talloc_free'd when we return
+ * EINPROGRESS/NT_STATUS_MORE_PROCESSING_REQUIRED. A new fsp
+ * pointer gets allocated on every re-run of the
+ * open code path. Allocating on fsp->conn instead
+ * of NULL allows use to get notified via destructor
+ * if the conn is force-closed or we shutdown.
+ * opd is always safely freed in all codepath so no
+ * memory leaks.
+ */
+ opd = create_private_open_data(fsp->conn,
+ dirfsp,
+ smb_fname,
+ fsp,
+ flags,
+ mode);
if (opd == NULL) {
DEBUG(10, ("open_async: Could not create private data.\n"));
return -1;
}
- ret = pthreadpool_add_job(open_pool,
- opd->jobid,
- aio_open_worker,
- (void *)opd);
- if (ret) {
- errno = ret;
+ subreq = pthreadpool_tevent_job_send(opd,
+ fsp->conn->sconn->ev_ctx,
+ fsp->conn->sconn->pool,
+ aio_open_worker, opd);
+ if (subreq == NULL) {
+ opd_free(opd);
return -1;
}
+ tevent_req_set_callback(subreq, aio_open_handle_completion, opd);
- DEBUG(5,("open_async: mid %llu jobid %d created for file %s/%s\n",
+ DEBUG(5,("open_async: mid %llu created for file %s\n",
(unsigned long long)opd->mid,
- opd->jobid,
- opd->dname,
- opd->fname));
+ opd->fsp_name->base_name));
+
+ /*
+ * Add a destructor to protect us from connection
+ * teardown whilst the open thread is in flight.
+ */
+ talloc_set_destructor(opd, opd_inflight_destructor);
/* Cause the calling code to reschedule us. */
- errno = EINTR; /* Maps to NT_STATUS_RETRY. */
+ errno = EINPROGRESS; /* Maps to NT_STATUS_MORE_PROCESSING_REQUIRED. */
return -1;
}
if (opd->in_progress) {
DEBUG(0,("find_completed_open: mid %llu "
- "jobid %d still in progress for "
- "file %s/%s. PANIC !\n",
+ "still in progress for "
+ "file %s. PANIC !\n",
(unsigned long long)opd->mid,
- opd->jobid,
- opd->dname,
- opd->fname));
+ opd->fsp_name->base_name));
/* Disaster ! This is an open timeout. Just panic. */
smb_panic("find_completed_open - in_progress\n");
/* notreached. */
DEBUG(5,("find_completed_open: mid %llu returning "
"fd = %d, errno = %d (%s) "
- "jobid (%d) for file %s\n",
+ "for file %s\n",
(unsigned long long)opd->mid,
opd->ret_fd,
opd->ret_errno,
strerror(opd->ret_errno),
- opd->jobid,
smb_fname_str_dbg(fsp->fsp_name)));
/* Now we can free the opd. */
- TALLOC_FREE(opd);
+ opd_free(opd);
return true;
}
opens to prevent any race conditions.
*****************************************************************/
-static int aio_pthread_open_fn(vfs_handle_struct *handle,
- struct smb_filename *smb_fname,
- files_struct *fsp,
- int flags,
- mode_t mode)
+static int aio_pthread_openat_fn(vfs_handle_struct *handle,
+ const struct files_struct *dirfsp,
+ const struct smb_filename *smb_fname,
+ struct files_struct *fsp,
+ const struct vfs_open_how *how)
{
int my_errno = 0;
int fd = -1;
bool aio_allow_open = lp_parm_bool(
SNUM(handle->conn), "aio_pthread", "aio open", false);
- if (smb_fname->stream_name) {
+ if (how->resolve != 0) {
+ errno = ENOSYS;
+ return -1;
+ }
+
+ if (is_named_stream(smb_fname)) {
/* Don't handle stream opens. */
errno = ENOENT;
return -1;
}
- if (!aio_allow_open) {
- /* aio opens turned off. */
- return open(smb_fname->base_name, flags, mode);
+ if (fsp->conn->sconn->pool == NULL) {
+ /*
+ * a threadpool is required for async support
+ */
+ aio_allow_open = false;
+ }
+
+ if (fsp->conn->sconn->client != NULL &&
+ fsp->conn->sconn->client->server_multi_channel_enabled) {
+ /*
+ * This module is not compatible with multi channel yet.
+ */
+ aio_allow_open = false;
}
- if (!(flags & O_CREAT)) {
+ if (fsp->fsp_flags.is_pathref) {
+ /* Use SMB_VFS_NEXT_OPENAT() to call openat() with O_PATH. */
+ aio_allow_open = false;
+ }
+
+ if (!(how->flags & O_CREAT)) {
/* Only creates matter. */
- return open(smb_fname->base_name, flags, mode);
+ aio_allow_open = false;
}
- if (!(flags & O_EXCL)) {
+ if (!(how->flags & O_EXCL)) {
/* Only creates with O_EXCL matter. */
- return open(smb_fname->base_name, flags, mode);
+ aio_allow_open = false;
+ }
+
+ if (!aio_allow_open) {
+ /* aio opens turned off. */
+ return SMB_VFS_NEXT_OPENAT(handle,
+ dirfsp,
+ smb_fname,
+ fsp,
+ how);
}
/*
}
/* Ok, it's a create exclusive call - pass it to a thread helper. */
- return open_async(fsp, flags, mode);
+ return open_async(dirfsp, smb_fname, fsp, how->flags, how->mode);
}
#endif
static struct vfs_fn_pointers vfs_aio_pthread_fns = {
-#if defined(HAVE_OPENAT) && defined(USE_LINUX_THREAD_CREDENTIALS)
- .open_fn = aio_pthread_open_fn,
+#if defined(HAVE_OPENAT) && defined(HAVE_LINUX_THREAD_CREDENTIALS)
+ .openat_fn = aio_pthread_openat_fn,
#endif
};
-NTSTATUS vfs_aio_pthread_init(void);
-NTSTATUS vfs_aio_pthread_init(void)
+static_decl_vfs;
+NTSTATUS vfs_aio_pthread_init(TALLOC_CTX *ctx)
{
return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
"aio_pthread", &vfs_aio_pthread_fns);