/* @} */
+struct tevent_threadpool;
+
+/**
+ * @defgroup tevent_threadpool The tevent_threadpool API
+ *
+ * This API provides a way to run threadsafe functions via tevent
+ * requests in a helper threadpool.
+ */
+
+/**
+ * @brief Create a tevent_threadpool
+ *
+ * A struct tevent_threadpool is the basis for for running tevent
+ * requests in parralel in a threadpool.
+ *
+ * @param[in] max_threads Maximum parallelism in this pool
+ * @param[in] ev tevent_context the pool lives in
+ * @param[out] presult Pointer to the threadpool returned
+ * @return success: 0, failure: errno
+ *
+ * max_threads=0 means unlimited parallelism. The caller has to take
+ * care to not overload the system.
+ */
+struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads);
+
+/**
+ * @brief Destroy a tevent_threadpool
+ *
+ * Destroy a tevent_threadpool. If jobs are still active, this returns
+ * EBUSY.
+ *
+ * @param[in] pool The pool to destroy
+ * @return success: 0, failure: errno
+ */
+int tevent_threadpool_destroy(struct tevent_threadpool *pool);
+
+/**
+ * @brief Schedule a computation to run in a threadpool
+ *
+ * private_data must be a pointer to a talloced object that is owned
+ * by the calling main thread and must not be freed in the worker
+ * thread.
+ *
+ * The computation can be cancelled by calling talloc_free() on the
+ * tevent_req.
+ *
+ * @param[in] pool The threadpool to use
+ * @param[in] fn Computation funtion to run
+ * @param[in] private_data talloced data to hand to the computation function
+ * @return tevent request on sucess, NULL on failure
+ */
+struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
+ void (*fn)(void *private_data),
+ void *private_data);
+
+/**
+ * @brief Get the result of a computation
+ *
+ * @param[in] req the computation request
+ * @param[out] perror errno in case of failure
+ * @return 0 on sucess or -1 if an error occured
+ */
+int tevent_threadpool_recv(struct tevent_req *req, int *perror);
+
/**
* @defgroup tevent_wrapper_ops The tevent wrapper operation functions
* @ingroup tevent
void tevent_trace_point_callback(struct tevent_context *ev,
enum tevent_trace_point);
+
+struct threadpool;
+struct threadpool_job;
+
+struct threadpool *threadpool_init(TALLOC_CTX *mem_ctx, unsigned max_threads);
+int threadpool_destroy(struct threadpool *pool);
+struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
+ struct threadpool *pool,
+ int job_id,
+ void (*fn)(void *private_data),
+ void *private_data);
+int threadpool_signal_fd(struct threadpool *pool);
+void threadpool_job_cancel(struct threadpool_job *job);
+bool threadpool_job_is_cancelled(struct threadpool_job *job);
+int threadpool_finished_jobs(struct threadpool *pool, int *jobids,
+ unsigned num_jobids);
--- /dev/null
+/*
+ * Unix SMB/CIFS implementation.
+ * thread pool implementation
+ * Copyright (C) Volker Lendecke 2009
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "config.h"
+#include <errno.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <pthread.h>
+#include <signal.h>
+#include <assert.h>
+#include <fcntl.h>
+#include "system/time.h"
+#include "system/filesys.h"
+#include "replace.h"
+
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "lib/util/dlinklist.h"
+
+struct threadpool_job {
+ /* List of jobs */
+ struct threadpool_job *prev, *next;
+
+ int id;
+ void (*fn)(void *private_data);
+ void *private_data;
+ bool cancel;
+};
+
+struct threadpool {
+ /*
+ * List threadpools for fork safety
+ */
+ struct threadpool *prev, *next;
+
+ /*
+ * Control access to this struct
+ */
+ pthread_mutex_t mutex;
+
+ /*
+ * Threads waiting for work do so here
+ */
+ pthread_cond_t condvar;
+
+ /*
+ * List of jobs
+ */
+ struct threadpool_job *jobs;
+
+ /*
+ * pipe for signalling
+ */
+ int sig_pipe[2];
+
+ /*
+ * indicator to worker threads that they should shut down
+ */
+ int shutdown;
+
+ /*
+ * maximum number of threads
+ */
+ int max_threads;
+
+ /*
+ * Number of threads
+ */
+ int num_threads;
+
+ /*
+ * Number of idle threads
+ */
+ int num_idle;
+
+ /*
+ * An array of threads that require joining.
+ */
+ int num_exited;
+ pthread_t *exited; /* We alloc more */
+};
+
+static pthread_mutex_t threadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct threadpool *threadpools = NULL;
+static pthread_once_t threadpool_atfork_initialized = PTHREAD_ONCE_INIT;
+
+static void threadpool_prep_atfork(void);
+
+/*
+ * Initialize a thread pool
+ */
+
+struct threadpool *threadpool_init(TALLOC_CTX *mem_ctx,
+ unsigned max_threads)
+{
+ struct threadpool *pool;
+ int ret;
+
+ pool = talloc_zero(mem_ctx, struct threadpool);
+ if (pool == NULL) {
+ return NULL;
+ }
+
+ ret = pipe(pool->sig_pipe);
+ if (ret == -1) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ ret = pthread_mutex_init(&pool->mutex, NULL);
+ if (ret != 0) {
+ close(pool->sig_pipe[0]);
+ close(pool->sig_pipe[1]);
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ ret = pthread_cond_init(&pool->condvar, NULL);
+ if (ret != 0) {
+ pthread_mutex_destroy(&pool->mutex);
+ close(pool->sig_pipe[0]);
+ close(pool->sig_pipe[1]);
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ pool->shutdown = 0;
+ pool->num_threads = 0;
+ pool->num_exited = 0;
+ pool->exited = NULL;
+ pool->max_threads = max_threads;
+ pool->num_idle = 0;
+
+ ret = pthread_mutex_lock(&threadpools_mutex);
+ if (ret != 0) {
+ pthread_cond_destroy(&pool->condvar);
+ pthread_mutex_destroy(&pool->mutex);
+ close(pool->sig_pipe[0]);
+ close(pool->sig_pipe[1]);
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+ DLIST_ADD(threadpools, pool);
+
+ ret = pthread_mutex_unlock(&threadpools_mutex);
+ assert(ret == 0);
+
+ pthread_once(&threadpool_atfork_initialized, threadpool_prep_atfork);
+
+ return pool;
+}
+
+static void threadpool_prepare(void)
+{
+ int ret;
+ struct threadpool *pool;
+
+ ret = pthread_mutex_lock(&threadpools_mutex);
+ assert(ret == 0);
+
+ pool = threadpools;
+
+ while (pool != NULL) {
+ ret = pthread_mutex_lock(&pool->mutex);
+ assert(ret == 0);
+ pool = pool->next;
+ }
+}
+
+static void threadpool_parent(void)
+{
+ int ret;
+ struct threadpool *pool;
+
+ for (pool = DLIST_TAIL(threadpools);
+ pool != NULL;
+ pool = DLIST_PREV(pool)) {
+ ret = pthread_mutex_unlock(&pool->mutex);
+ assert(ret == 0);
+ }
+
+ ret = pthread_mutex_unlock(&threadpools_mutex);
+ assert(ret == 0);
+}
+
+static void threadpool_child(void)
+{
+ int ret;
+ struct threadpool *pool;
+
+ for (pool = DLIST_TAIL(threadpools);
+ pool != NULL;
+ pool = DLIST_PREV(pool)) {
+
+ close(pool->sig_pipe[0]);
+ close(pool->sig_pipe[1]);
+
+ ret = pipe(pool->sig_pipe);
+ assert(ret == 0);
+
+ pool->num_threads = 0;
+
+ pool->num_exited = 0;
+ free(pool->exited);
+ pool->exited = NULL;
+
+ pool->num_idle = 0;
+
+ ret = pthread_mutex_unlock(&pool->mutex);
+ assert(ret == 0);
+ }
+
+ ret = pthread_mutex_unlock(&threadpools_mutex);
+ assert(ret == 0);
+}
+
+static void threadpool_prep_atfork(void)
+{
+ pthread_atfork(threadpool_prepare, threadpool_parent,
+ threadpool_child);
+}
+
+/*
+ * Return the file descriptor which becomes readable when a job has
+ * finished
+ */
+
+int threadpool_signal_fd(struct threadpool *pool)
+{
+ return pool->sig_pipe[0];
+}
+
+void threadpool_job_cancel(struct threadpool_job *job)
+{
+ job->cancel = true;
+}
+
+bool threadpool_job_is_cancelled(struct threadpool_job *job)
+{
+ return job->cancel;
+}
+
+/*
+ * Do a pthread_join() on all children that have exited, pool->mutex must be
+ * locked
+ */
+static void threadpool_join_children(struct threadpool *pool)
+{
+ int i;
+
+ for (i=0; i<pool->num_exited; i++) {
+ pthread_join(pool->exited[i], NULL);
+ }
+ pool->num_exited = 0;
+
+ /*
+ * Deliberately not free and NULL pool->exited. That will be
+ * re-used by realloc later.
+ */
+}
+
+/*
+ * Fetch a finished job number from the signal pipe
+ */
+
+int threadpool_finished_jobs(struct threadpool *pool, int *jobids,
+ unsigned num_jobids)
+{
+ ssize_t to_read, nread;
+
+ nread = -1;
+ errno = EINTR;
+
+ to_read = sizeof(int) * num_jobids;
+
+ while ((nread == -1) && (errno == EINTR)) {
+ nread = read(pool->sig_pipe[0], jobids, to_read);
+ }
+ if (nread == -1) {
+ return -errno;
+ }
+ if ((nread % sizeof(int)) != 0) {
+ return -EINVAL;
+ }
+ return nread / sizeof(int);
+}
+
+/*
+ * Destroy a thread pool, finishing all threads working for it
+ */
+
+int threadpool_destroy(struct threadpool *pool)
+{
+ int ret, ret1;
+
+ ret = pthread_mutex_lock(&pool->mutex);
+ if (ret != 0) {
+ return ret;
+ }
+
+ if ((pool->jobs != NULL) || pool->shutdown) {
+ ret = pthread_mutex_unlock(&pool->mutex);
+ assert(ret == 0);
+ return EBUSY;
+ }
+
+ if (pool->num_threads > 0) {
+ /*
+ * We have active threads, tell them to finish, wait for that.
+ */
+
+ pool->shutdown = 1;
+
+ if (pool->num_idle > 0) {
+ /*
+ * Wake the idle threads. They will find
+ * pool->shutdown to be set and exit themselves
+ */
+ ret = pthread_cond_broadcast(&pool->condvar);
+ if (ret != 0) {
+ pthread_mutex_unlock(&pool->mutex);
+ return ret;
+ }
+ }
+
+ while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
+
+ if (pool->num_exited > 0) {
+ threadpool_join_children(pool);
+ continue;
+ }
+ /*
+ * A thread that shuts down will also signal
+ * pool->condvar
+ */
+ ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
+ if (ret != 0) {
+ pthread_mutex_unlock(&pool->mutex);
+ return ret;
+ }
+ }
+ }
+
+ ret = pthread_mutex_unlock(&pool->mutex);
+ if (ret != 0) {
+ return ret;
+ }
+ ret = pthread_mutex_destroy(&pool->mutex);
+ ret1 = pthread_cond_destroy(&pool->condvar);
+
+ if (ret != 0) {
+ return ret;
+ }
+ if (ret1 != 0) {
+ return ret1;
+ }
+
+ ret = pthread_mutex_lock(&threadpools_mutex);
+ if (ret != 0) {
+ return ret;
+ }
+ DLIST_REMOVE(threadpools, pool);
+ ret = pthread_mutex_unlock(&threadpools_mutex);
+ assert(ret == 0);
+
+ close(pool->sig_pipe[0]);
+ pool->sig_pipe[0] = -1;
+
+ close(pool->sig_pipe[1]);
+ pool->sig_pipe[1] = -1;
+
+ free(pool->exited);
+ TALLOC_FREE(pool);
+
+ return 0;
+}
+
+/*
+ * Prepare for pthread_exit(), pool->mutex must be locked
+ */
+static void threadpool_server_exit(struct threadpool *pool)
+{
+ pthread_t *exited;
+
+ pool->num_threads--;
+
+ exited = (pthread_t *)realloc(
+ pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
+
+ if (exited == NULL) {
+ /* lost a thread status */
+ return;
+ }
+ pool->exited = exited;
+
+ pool->exited[pool->num_exited] = pthread_self();
+ pool->num_exited++;
+}
+
+static struct threadpool_job *threadpool_get_job(struct threadpool *p)
+{
+ struct threadpool_job *job = p->jobs;
+
+ if (job) {
+ DLIST_REMOVE(p->jobs, job);
+ }
+ return job;
+}
+
+static struct threadpool_job *threadpool_put_job(TALLOC_CTX *mem_ctx,
+ struct threadpool *p,
+ int id,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ struct threadpool_job *job;
+
+ job = talloc_zero(mem_ctx, struct threadpool_job);
+ if (job == NULL) {
+ return NULL;
+ }
+
+ job->id = id;
+ job->fn = fn;
+ job->private_data = private_data;
+ DLIST_ADD_END(p->jobs, job, struct threadpool_job *);
+
+ return job;
+}
+
+static void *threadpool_server(void *arg)
+{
+ struct threadpool *pool = (struct threadpool *)arg;
+ int res;
+
+ res = pthread_mutex_lock(&pool->mutex);
+ if (res != 0) {
+ return NULL;
+ }
+
+ while (1) {
+ struct timespec ts;
+ struct threadpool_job *job;
+
+ /*
+ * idle-wait at most 1 second. If nothing happens in that
+ * time, exit this thread.
+ */
+
+ clock_gettime(CLOCK_MONOTONIC, &ts);
+ ts.tv_sec += 1;
+
+ while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+
+ pool->num_idle++;
+ res = pthread_cond_timedwait(
+ &pool->condvar, &pool->mutex, &ts);
+ pool->num_idle--;
+
+ if (res == ETIMEDOUT) {
+ if (pool->jobs == NULL) {
+ /*
+ * we timed out and still no work for
+ * us. Exit.
+ */
+ threadpool_server_exit(pool);
+ pthread_mutex_unlock(&pool->mutex);
+ return NULL;
+ }
+
+ break;
+ }
+ assert(res == 0);
+ }
+
+ job = threadpool_get_job(pool);
+ if (job != NULL) {
+ ssize_t written;
+ int sig_pipe = pool->sig_pipe[1];
+
+ printf("threadpool_server: pulled job %d\n", job->id);
+ /*
+ * Do the work with the mutex unlocked
+ */
+
+ res = pthread_mutex_unlock(&pool->mutex);
+ assert(res == 0);
+
+ if (likely(!job->cancel)) {
+ printf("threadpool_server: executing job %d\n", job->id);
+ job->fn(job->private_data);
+ } else {
+ printf("threadpool_server: skipping cancelled job %d\n", job->id);
+ }
+
+ /*
+ * Do the write without the lock, otherwise we
+ * can deadlock with the scheduler
+ */
+ written = write(sig_pipe, &job->id, sizeof(job->id));
+
+ res = pthread_mutex_lock(&pool->mutex);
+ assert(res == 0);
+
+ if (written != sizeof(int)) {
+ threadpool_server_exit(pool);
+ pthread_mutex_unlock(&pool->mutex);
+ return NULL;
+ }
+ }
+
+ if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+ /*
+ * No more work to do and we're asked to shut down, so
+ * exit
+ */
+ threadpool_server_exit(pool);
+
+ if (pool->num_threads == 0) {
+ /*
+ * Ping the main thread waiting for all of us
+ * workers to have quit.
+ */
+ pthread_cond_broadcast(&pool->condvar);
+ }
+
+ pthread_mutex_unlock(&pool->mutex);
+ return NULL;
+ }
+ }
+}
+
+struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
+ struct threadpool *pool,
+ int job_id,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ pthread_t thread_id;
+ int res;
+ sigset_t mask, omask;
+ struct threadpool_job *job;
+
+ res = pthread_mutex_lock(&pool->mutex);
+ assert(res == 0);
+
+ if (pool->shutdown) {
+ /*
+ * Protect against the pool being shut down while
+ * trying to add a job
+ */
+ printf("threadpool_add_job: pool is shutting down\n");
+ res = pthread_mutex_unlock(&pool->mutex);
+ assert(res == 0);
+ return NULL;
+ }
+
+ /*
+ * Just some cleanup under the mutex
+ */
+ threadpool_join_children(pool);
+
+ /*
+ * Add job to the end of the queue
+ */
+ job = threadpool_put_job(mem_ctx, pool, job_id, fn, private_data);
+ if (job == NULL) {
+ printf("threadpool_add_job: threadpool_put_job failed\n");
+ pthread_mutex_unlock(&pool->mutex);
+ errno = ENOMEM;
+ return NULL;
+ }
+
+ if (pool->num_idle > 0) {
+ /*
+ * We have idle threads, wake one.
+ */
+ res = pthread_cond_signal(&pool->condvar);
+ pthread_mutex_unlock(&pool->mutex);
+ return job;
+ }
+
+ if ((pool->max_threads != 0) &&
+ (pool->num_threads >= pool->max_threads)) {
+ /*
+ * No more new threads, we just queue the request
+ */
+ pthread_mutex_unlock(&pool->mutex);
+ return job;
+ }
+
+ /*
+ * Create a new worker thread. It should not receive any signals.
+ */
+
+ sigfillset(&mask);
+
+ res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
+ assert(res == 0);
+
+ res = pthread_create(&thread_id, NULL, threadpool_server,
+ (void *)pool);
+ if (res == 0) {
+ pool->num_threads += 1;
+ }
+
+ res = pthread_sigmask(SIG_SETMASK, &omask, NULL);
+ assert(res == 0);
+
+ pthread_mutex_unlock(&pool->mutex);
+ return job;
+}
/* Notreached. */
}
}
+
+struct tevent_threadpool_state {
+ struct tevent_threadpool *pool;
+ int job_id;
+ bool done;
+ void *private_parent;
+ void *job_private;
+ struct threadpool_job *job;
+};
+
+struct tevent_threadpool {
+ struct tevent_context *ev;
+ struct threadpool *threadpool;
+ int next_job_id;
+ int sig_fd;
+ struct tevent_req **pending;
+
+ struct tevent_threadpool_state **orphaned;
+ int num_orphaned;
+
+ struct tevent_fd *fde;
+};
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
+{
+ printf("tevent_threadpool_destructor\n");
+ while (talloc_array_length(pool->pending) != 0) {
+ /* No TALLOC_FREE here */
+ talloc_free(pool->pending[0]);
+ }
+
+ while (pool->num_orphaned != 0) {
+ /*
+ * We've got jobs in the queue for which the tevent_req has
+ * been finished already. Wait for all of them to finish.
+ */
+ tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
+ }
+
+ threadpool_destroy(pool->threadpool);
+ pool->threadpool = NULL;
+
+ return 0;
+}
+
+struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads)
+{
+ struct tevent_threadpool *pool;
+
+ pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+ if (pool == NULL) {
+ return NULL;
+ }
+ pool->ev = ev;
+
+ pool->threadpool = threadpool_init(pool, max_threads);
+ if (pool->threadpool == NULL) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+ talloc_set_destructor(pool, tevent_threadpool_destructor);
+
+ pool->sig_fd = threadpool_signal_fd(pool->threadpool);
+ if (pool->sig_fd == -1) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ return pool;
+}
+
+static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
+{
+ int num_pending = talloc_array_length(pool->pending);
+ int result;
+
+ while (true) {
+ int i;
+
+ result = pool->next_job_id++;
+ if (result == 0) {
+ continue;
+ }
+
+ for (i = 0; i < num_pending; i++) {
+ struct tevent_threadpool_state *state = tevent_req_data(
+ pool->pending[i], struct tevent_threadpool_state);
+
+ if (result == state->job_id) {
+ break;
+ }
+ }
+ if (i == num_pending) {
+ return result;
+ }
+ }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
+ struct tevent_threadpool *pool,
+ struct tevent_context *ev)
+{
+ struct tevent_req **pending;
+ int num_pending, orphaned_array_length;
+
+ num_pending = talloc_array_length(pool->pending);
+ printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
+ num_pending);
+ pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
+ num_pending + 1);
+ if (pending == NULL) {
+ return false;
+ }
+ pending[num_pending] = req;
+ num_pending++;
+ pool->pending = pending;
+ tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
+
+ /*
+ * Make sure that the orphaned array of
+ * tevent_threadpool_state structs has enough space. A job can
+ * change from pending to orphaned in
+ * tevent_threadpool_cleanup, and to fail in a talloc
+ * destructor should be avoided if possible.
+ */
+
+ orphaned_array_length = talloc_array_length(pool->orphaned);
+ if (num_pending > orphaned_array_length) {
+ struct tevent_threadpool_state **orphaned;
+
+ orphaned = talloc_realloc(pool, pool->orphaned,
+ struct tevent_threadpool_state *,
+ orphaned_array_length + 1);
+ if (orphaned == NULL) {
+ tevent_threadpool_req_unset_pending(req);
+ return false;
+ }
+ pool->orphaned = orphaned;
+ }
+
+ if (pool->fde != NULL) {
+ return true;
+ }
+
+ pool->fde = tevent_add_fd(pool->ev,
+ pool,
+ pool->sig_fd, TEVENT_FD_READ,
+ tevent_threadpool_handler, pool);
+ if (pool->fde == NULL) {
+ tevent_threadpool_req_unset_pending(req);
+ return false;
+ }
+ return true;
+}
+
+static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
+{
+ struct tevent_threadpool *pool = state->pool;
+ int num_pending = talloc_array_length(pool->pending);
+
+ if (num_pending == 0 && pool->num_orphaned == 0) {
+ printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+ TALLOC_FREE(pool->fde);
+ }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
+{
+ struct tevent_threadpool_state *state = tevent_req_data(
+ req, struct tevent_threadpool_state);
+ struct tevent_threadpool *pool = state->pool;
+ int num_pending = talloc_array_length(pool->pending);
+ int i;
+
+ tevent_req_set_cleanup_fn(req, NULL);
+
+ if (num_pending == 1) {
+ if (pool->num_orphaned == 0) {
+ printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+ TALLOC_FREE(pool->fde);
+ }
+ TALLOC_FREE(pool->pending);
+ return;
+ }
+
+ for (i = 0; i < num_pending; i++) {
+ if (req == pool->pending[i]) {
+ break;
+ }
+ }
+ if (i == num_pending) {
+ return;
+ }
+ if (num_pending > 1) {
+ pool->pending[i] = pool->pending[num_pending - 1];
+ }
+ pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
+ num_pending - 1);
+}
+
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tevent_threadpool_state *state = tevent_req_data(
+ req, struct tevent_threadpool_state);
+ struct tevent_threadpool *pool = state->pool;
+
+ printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+
+ switch (req_state) {
+ case TEVENT_REQ_RECEIVED:
+ break;
+ default:
+ printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+ return;
+ }
+
+ if (state->done) {
+ printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+ tevent_threadpool_req_unset_pending(req);
+ return;
+ }
+
+ printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+
+ threadpool_job_cancel(state->job);
+
+ /*
+ * Keep around the state of the deleted request until the
+ * request has finished in the helper
+ * thread. tevent_threadpool_handler will destroy it.
+ */
+ pool->num_orphaned++;
+ tevent_threadpool_req_unset_pending(req);
+ pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+ &req->data);
+}
+
+struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ struct tevent_req *req;
+ struct tevent_threadpool_state *state;
+
+ req = tevent_req_create(pool, &state, struct tevent_threadpool_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->pool = pool;
+ state->job_id = tevent_threadpool_next_job_id(state->pool);
+ state->done = false;
+ state->job_private = private_data;
+
+ printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
+
+ state->job = threadpool_add_job(state,
+ state->pool->threadpool,
+ state->job_id,
+ fn,
+ state->job_private);
+ if (state->job == NULL) {
+ printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
+ tevent_req_error(req, errno);
+ return tevent_req_post(req, pool->ev);
+ }
+ if (!tevent_threadpool_req_set_pending(req, pool, pool->ev)) {
+ printf("tevent_threadpool_send: job %d tevent_threadpool_req_set_pending failed\n", state->job_id);
+ tevent_req_oom(req);
+ return tevent_req_post(req, pool->ev);
+ }
+ return req;
+}
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags,
+ void *private_data)
+{
+ struct tevent_threadpool *pool = talloc_get_type_abort(
+ private_data, struct tevent_threadpool);
+ struct tevent_threadpool_state *orphaned_state;
+ int i, num_pending;
+ int job_id;
+
+ if (threadpool_finished_jobs(pool->threadpool, &job_id, 1) < 0) {
+ return;
+ }
+
+ printf("tevent_threadpool_handler: %d finished (num orphaned: %d)\n",
+ job_id, pool->num_orphaned);
+
+ num_pending = talloc_array_length(pool->pending);
+
+ for (i=0; i<num_pending; i++) {
+ struct tevent_threadpool_state *state = tevent_req_data(
+ pool->pending[i], struct tevent_threadpool_state);
+
+ printf("tevent_threadpool_handler: %d pending\n", state->job_id);
+ if (job_id == state->job_id) {
+ state->done = true;
+ if (threadpool_job_is_cancelled(state->job)) {
+ printf("tevent_threadpool_handler: %d cancelled\n", job_id);
+ tevent_req_error(pool->pending[i], EINTR);
+ tevent_req_post(pool->pending[i], ev);
+ return;
+ }
+ printf("tevent_threadpool_handler: %d done\n", job_id);
+ tevent_req_done(pool->pending[i]);
+ return;
+ }
+ }
+
+ printf("tevent_threadpool_handler: num orphaned: %d\n", pool->num_orphaned);
+
+ for (i = 0; i < pool->num_orphaned; i++) {
+ if (job_id == pool->orphaned[i]->job_id) {
+ break;
+ }
+ }
+ if (i == pool->num_orphaned) {
+ printf("tevent_threadpool_handler: done\n");
+ return;
+ }
+
+ printf("tevent_threadpool_handler: freeing orphaned: %d\n", pool->num_orphaned);
+
+ orphaned_state = pool->orphaned[i];
+ if (i < pool->num_orphaned - 1) {
+ pool->orphaned[i] = pool->orphaned[pool->num_orphaned - 1];
+ }
+ pool->num_orphaned--;
+ tevent_threadpool_state_unset_orphaned(orphaned_state);
+ TALLOC_FREE(orphaned_state);
+}
+
+int tevent_threadpool_recv(struct tevent_req *req, int *perrno)
+{
+ enum tevent_req_state req_state;
+ uint64_t error;
+ struct tevent_threadpool_state *state = tevent_req_data(
+ req, struct tevent_threadpool_state);
+
+ if (!tevent_req_is_error(req, &req_state, &error)) {
+ printf("tevent_threadpool_recv: %d done\n", state->job_id);
+ tevent_req_received(req);
+ return 0;
+ }
+
+ printf("tevent_threadpool_recv: %d failed\n", state->job_id);
+
+ switch (req_state) {
+ case TEVENT_REQ_NO_MEMORY:
+ *perrno = ENOMEM;
+ break;
+ case TEVENT_REQ_TIMED_OUT:
+ *perrno = ETIMEDOUT;
+ break;
+ case TEVENT_REQ_USER_ERROR:
+ *perrno = (int)error;
+ break;
+ default:
+ *perrno = EIO;
+ break;
+ }
+
+ tevent_req_received(req);
+ return -1;
+}
+
#else
/* !HAVE_PTHREAD */
struct tevent_thread_proxy *tevent_thread_proxy_create(
{
;
}
+
+struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads)
+{
+ return NULL;
+}
+
+struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
+ void (*fn)(void *private_data),
+ void *private_data)
+{
+ return NULL;
+}
+
+bool tevent_threadpool_recv(struct tevent_req *req)
+{
+ return false;
+}
#endif
static int tevent_threaded_context_destructor(
SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c
tevent_queue.c tevent_req.c tevent_wrapper.c
- tevent_poll.c tevent_threads.c
+ tevent_poll.c tevent_threads.c tevent_threadpool.c
tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c'''
if bld.CONFIG_SET('HAVE_EPOLL'):