--- /dev/null
+/*
+ tevent event library.
+
+ Copyright (C) Jeremy Allison 2015
+
+ ** NOTE! The following LGPL license applies to the tevent
+ ** library. This does NOT imply that all of Samba is released
+ ** under the LGPL
+
+ This library is free software; you can redistribute it and/or
+ modify it under the terms of the GNU Lesser General Public
+ License as published by the Free Software Foundation; either
+ version 3 of the License, or (at your option) any later version.
+
+ This library is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
+ Lesser General Public License for more details.
+
+ You should have received a copy of the GNU Lesser General Public
+ License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+#include "lib/util/dlinklist.h"
+
+#include <pthread.h>
+
+struct tevent_threadpool;
+
+struct tevent_threadpool_job {
+ struct tevent_threadpool_job *prev, *next;
+ struct tevent_threadpool *pool;
+
+ struct {
+ struct tevent_immediate *im;
+ bool busy;
+ bool done;
+ int ret;
+ //bool cancel;
+ } internal;
+
+ struct {
+ const struct tevent_threadpool_job_description *desc;
+ void *arg;
+ } state;
+};
+
+struct tevent_threadpool {
+ struct tevent_threadpool *prev, *next;
+ struct tevent_context *ev;
+
+ size_t max_threads;
+ struct tevent_threadpool_job *jobs;
+ struct tevent_queue *job_queue;
+
+};
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+ struct tevent_fd *fde,
+ uint16_t flags, void *private_data);
+
+static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
+{
+ tevent_queue_stop(pool->job_queue);
+
+ //TODO DLIST_REMOVE all pool->jobs
+
+#if 0
+ printf("tevent_threadpool_destructor\n");
+ while (talloc_array_length(pool->pending) != 0) {
+ /* No TALLOC_FREE here */
+ talloc_free(pool->pending[0]);
+ }
+
+ while (pool->num_orphaned != 0) {
+ /*
+ * We've got jobs in the queue for which the tevent_req has
+ * been finished already. Wait for all of them to finish.
+ */
+ tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
+ }
+
+ threadpool_destroy(pool->threadpool);
+ pool->threadpool = NULL;
+#endif
+ return 0;
+}
+
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads,
+ const char *location)
+{
+ struct tevent_threadpool *pool;
+
+ if (ev->wrapper.glue != NULL) {
+ /*
+ * stacking of wrappers is not supported
+ */
+ tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
+ "%s: %s() stacking not allowed",
+ __func__, location);
+ errno = ENOSYS;
+ return NULL;
+ }
+
+ pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+ if (pool == NULL) {
+ return NULL;
+ }
+ *pool = (struct tevent_threadpool) {
+ .ev = ev,
+ .max_threads = max_threads,
+ };
+
+ pool->job_queue = tevent_queue_create(pool, "job_queue");
+ if (pool->job_queue == NULL) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ DLIST_ADD_END(ev->threads.pools, pool);
+
+ return pool;
+}
+#if 0
+static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
+{
+ int num_pending = talloc_array_length(pool->pending);
+ int result;
+
+ while (true) {
+ int i;
+
+ result = pool->next_job_id++;
+ if (result == 0) {
+ continue;
+ }
+
+ for (i = 0; i < num_pending; i++) {
+ struct tevent_threadpool_state *state = tevent_req_data(
+ pool->pending[i], struct tevent_threadpool_state);
+
+ if (result == state->job_id) {
+ break;
+ }
+ }
+ if (i == num_pending) {
+ return result;
+ }
+ }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state);
+
+static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
+ struct tevent_threadpool *pool,
+ struct tevent_context *ev)
+{
+ struct tevent_req **pending;
+ int num_pending, orphaned_array_length;
+
+ num_pending = talloc_array_length(pool->pending);
+ printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
+ num_pending);
+ pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
+ num_pending + 1);
+ if (pending == NULL) {
+ return false;
+ }
+ pending[num_pending] = req;
+ num_pending++;
+ pool->pending = pending;
+ tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
+
+ /*
+ * Make sure that the orphaned array of
+ * tevent_threadpool_state structs has enough space. A job can
+ * change from pending to orphaned in
+ * tevent_threadpool_cleanup, and to fail in a talloc
+ * destructor should be avoided if possible.
+ */
+
+ orphaned_array_length = talloc_array_length(pool->orphaned);
+ if (num_pending > orphaned_array_length) {
+ struct tevent_threadpool_state **orphaned;
+
+ orphaned = talloc_realloc(pool, pool->orphaned,
+ struct tevent_threadpool_state *,
+ orphaned_array_length + 1);
+ if (orphaned == NULL) {
+ tevent_threadpool_req_unset_pending(req);
+ return false;
+ }
+ pool->orphaned = orphaned;
+ }
+
+ if (pool->fde != NULL) {
+ return true;
+ }
+
+ pool->fde = tevent_add_fd(pool->ev,
+ pool,
+ pool->sig_fd, TEVENT_FD_READ,
+ tevent_threadpool_handler, pool);
+ if (pool->fde == NULL) {
+ tevent_threadpool_req_unset_pending(req);
+ return false;
+ }
+ return true;
+}
+
+static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
+{
+ struct tevent_threadpool *pool = state->pool;
+ int num_pending = talloc_array_length(pool->pending);
+
+ if (num_pending == 0 && pool->num_orphaned == 0) {
+ printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+ TALLOC_FREE(pool->fde);
+ }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
+{
+ struct tevent_threadpool_state *state = tevent_req_data(
+ req, struct tevent_threadpool_state);
+ struct tevent_threadpool *pool = state->pool;
+ int num_pending = talloc_array_length(pool->pending);
+ int i;
+
+ tevent_req_set_cleanup_fn(req, NULL);
+
+ if (num_pending == 1) {
+ if (pool->num_orphaned == 0) {
+ printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+ TALLOC_FREE(pool->fde);
+ }
+ TALLOC_FREE(pool->pending);
+ return;
+ }
+
+ for (i = 0; i < num_pending; i++) {
+ if (req == pool->pending[i]) {
+ break;
+ }
+ }
+ if (i == num_pending) {
+ return;
+ }
+ if (num_pending > 1) {
+ pool->pending[i] = pool->pending[num_pending - 1];
+ }
+ pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
+ num_pending - 1);
+}
+#endif
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+#if 0
+ struct tevent_threadpool_state *state = tevent_req_data(
+ req, struct tevent_threadpool_state);
+ //struct tevent_threadpool *pool = state->pool;
+
+ //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+
+ switch (req_state) {
+ case TEVENT_REQ_RECEIVED:
+ break;
+ default:
+ // printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+ return;
+ }
+
+ if (state->done) {
+ // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+ tevent_threadpool_req_unset_pending(req);
+ return;
+ }
+
+ //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+ threadpool_job_cancel(state->job);
+
+ /*
+ * Keep around the state of the deleted request until the
+ * request has finished in the helper
+ * thread. tevent_threadpool_handler will destroy it.
+ */
+ pool->num_orphaned++;
+ tevent_threadpool_req_unset_pending(req);
+ pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+ &req->data);
+#endif
+}
+
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
+{
+ if (job->pool == NULL) {
+ return 0;
+ }
+
+ //DLIST_REMOVE(job->pool, job);
+ job->pool = NULL;
+ return 0;
+}
+
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
+ const struct tevent_threadpool_job_description *desc,
+ void *pstate,
+ const char *location)
+{
+ struct tevent_threadpool_job *job;
+ void **ppstate = (void **)pstate;
+ void *state;
+ size_t payload;
+
+ payload = sizeof(struct tevent_immediate) + desc->type_size;
+ if (payload < sizeof(struct tevent_immediate)) {
+ /* overflow */
+ return NULL;
+ }
+
+ job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
+ 2, payload);
+ if (job == NULL) {
+ return NULL;
+ }
+
+ *job = (struct tevent_threadpool_job) {
+ .internal = {
+ .im = tevent_create_immediate(job),
+ },
+ .state = {
+ .desc = desc,
+ .arg = talloc_zero_size(job, desc->type_size),
+ },
+ };
+ talloc_set_name_const(job->state.arg, desc->type);
+
+ talloc_set_destructor(job, tevent_threadpool_job_destructor);
+
+ *ppstate = state;
+ return job;
+}
+
+struct metze_thread_args {
+ uint8_t tmp;
+};
+
+static int metze_thread_fn(struct metze_thread_args *args)
+{
+ return 0;
+}
+
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
+
+struct tevent_threadpool_job_state {
+ struct tevent_threadpool_job *job;
+};
+
+static void tevent_threadpool_job_cleanup(struct tevent_req *req,
+ enum tevent_req_state req_state)
+{
+ struct tevent_threadpool_job_state *state =
+ tevent_req_data(req,
+ struct tevent_threadpool_job_state);
+
+ switch (req_state) {
+ case TEVENT_REQ_RECEIVED:
+ break;
+ default:
+ return;
+ }
+
+
+ if (state->done) {
+ // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+ tevent_threadpool_req_unset_pending(req);
+ return;
+ }
+
+ //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+ threadpool_job_cancel(state->job);
+
+ /*
+ * Keep around the state of the deleted request until the
+ * request has finished in the helper
+ * thread. tevent_threadpool_handler will destroy it.
+ */
+ pool->num_orphaned++;
+ tevent_threadpool_req_unset_pending(req);
+ pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+ &req->data);
+#endif
+}
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tevent_threadpool *pool,
+ struct tevent_threadpool_job *job)
+{
+ struct tevent_req *req;
+ struct tevent_threadpool_job_state *state;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tevent_threadpool_job_state);
+ if (req == NULL) {
+ return NULL;
+ }
+ state->job = job;
+
+ return req;
+}
+
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
+{
+ struct tevent_threadpool_job_state *state =
+ tevent_req_data(req,
+ struct tevent_threadpool_job_state);
+ enum tevent_req_state req_state;
+ uint64_t error;
+
+ if (!tevent_req_is_error(req, &req_state, &error)) {
+ tevent_req_received(req);
+ return 0;
+ }
+
+ switch (req_state) {
+ case TEVENT_REQ_NO_MEMORY:
+ *perrno = ENOMEM;
+ break;
+ case TEVENT_REQ_TIMED_OUT:
+ *perrno = ETIMEDOUT;
+ break;
+ case TEVENT_REQ_USER_ERROR:
+ *perrno = (int)error;
+ break;
+ default:
+ *perrno = EIO;
+ break;
+ }
+
+ tevent_req_received(req);
+ return -1;
+}
}
}
-/*********************************************************************
- * Raw threadpool implementation
- *********************************************************************/
-
-struct threadpool_job {
- /* List of jobs */
- struct threadpool_job *prev, *next;
-
- int id;
- void (*fn)(void *private_data);
- void *private_data;
- bool cancel;
-};
-
-struct threadpool {
- /*
- * List threadpools for fork safety
- */
- struct threadpool *prev, *next;
-
- /*
- * Control access to this struct
- */
- pthread_mutex_t mutex;
-
- /*
- * Threads waiting for work do so here
- */
- pthread_cond_t condvar;
-
- /*
- * List of jobs
- */
- struct threadpool_job *jobs;
-
- /*
- * pipe for signalling
- */
- int sig_pipe[2];
-
- /*
- * indicator to worker threads that they should shut down
- */
- int shutdown;
-
- /*
- * maximum number of threads
- */
- int max_threads;
-
- /*
- * Number of threads
- */
- int num_threads;
-
- /*
- * Number of idle threads
- */
- int num_idle;
-
- /*
- * An array of threads that require joining.
- */
- int num_exited;
- pthread_t *exited; /* We alloc more */
-};
-
-static pthread_mutex_t threadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
-static struct threadpool *threadpools = NULL;
-static pthread_once_t threadpool_atfork_initialized = PTHREAD_ONCE_INIT;
-
-static void threadpool_prep_atfork(void);
-
-#if 0
-/*
- * Initialize a thread pool
- */
-
-static struct threadpool *threadpool_init(TALLOC_CTX *mem_ctx,
- unsigned max_threads)
-{
- struct threadpool *pool;
- int ret;
-
- pool = talloc_zero(mem_ctx, struct threadpool);
- if (pool == NULL) {
- return NULL;
- }
-
- ret = pipe(pool->sig_pipe);
- if (ret == -1) {
- TALLOC_FREE(pool);
- return NULL;
- }
-
- ret = pthread_mutex_init(&pool->mutex, NULL);
- if (ret != 0) {
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
- TALLOC_FREE(pool);
- return NULL;
- }
-
- ret = pthread_cond_init(&pool->condvar, NULL);
- if (ret != 0) {
- pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
- TALLOC_FREE(pool);
- return NULL;
- }
-
- pool->shutdown = 0;
- pool->num_threads = 0;
- pool->num_exited = 0;
- pool->exited = NULL;
- pool->max_threads = max_threads;
- pool->num_idle = 0;
-
- ret = pthread_mutex_lock(&threadpools_mutex);
- if (ret != 0) {
- pthread_cond_destroy(&pool->condvar);
- pthread_mutex_destroy(&pool->mutex);
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
- TALLOC_FREE(pool);
- return NULL;
- }
- DLIST_ADD(threadpools, pool);
-
- ret = pthread_mutex_unlock(&threadpools_mutex);
- assert(ret == 0);
-
- pthread_once(&threadpool_atfork_initialized, threadpool_prep_atfork);
-
- return pool;
-}
-
-static void threadpool_prepare(void)
-{
- int ret;
- struct threadpool *pool;
-
- ret = pthread_mutex_lock(&threadpools_mutex);
- assert(ret == 0);
-
- pool = threadpools;
-
- while (pool != NULL) {
- ret = pthread_mutex_lock(&pool->mutex);
- assert(ret == 0);
- pool = pool->next;
- }
-}
-
-static void threadpool_parent(void)
-{
- int ret;
- struct threadpool *pool;
-
- for (pool = DLIST_TAIL(threadpools);
- pool != NULL;
- pool = DLIST_PREV(pool)) {
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
- }
-
- ret = pthread_mutex_unlock(&threadpools_mutex);
- assert(ret == 0);
-}
-
-static void threadpool_child(void)
-{
- int ret;
- struct threadpool *pool;
-
- for (pool = DLIST_TAIL(threadpools);
- pool != NULL;
- pool = DLIST_PREV(pool)) {
-
- close(pool->sig_pipe[0]);
- close(pool->sig_pipe[1]);
-
- ret = pipe(pool->sig_pipe);
- assert(ret == 0);
-
- pool->num_threads = 0;
-
- pool->num_exited = 0;
- free(pool->exited);
- pool->exited = NULL;
-
- pool->num_idle = 0;
-
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
- }
-
- ret = pthread_mutex_unlock(&threadpools_mutex);
- assert(ret == 0);
-}
-
-static void threadpool_prep_atfork(void)
-{
- pthread_atfork(threadpool_prepare, threadpool_parent,
- threadpool_child);
-}
-
-/*
- * Return the file descriptor which becomes readable when a job has
- * finished
- */
-
-static int threadpool_signal_fd(struct threadpool *pool)
-{
- return pool->sig_pipe[0];
-}
-
-static void threadpool_job_cancel(struct threadpool_job *job)
-{
- job->cancel = true;
-}
-static bool threadpool_job_is_cancelled(struct threadpool_job *job)
-{
- return job->cancel;
-}
-
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void threadpool_join_children(struct threadpool *pool)
-{
- int i;
-
- for (i=0; i<pool->num_exited; i++) {
- pthread_join(pool->exited[i], NULL);
- }
- pool->num_exited = 0;
-
- /*
- * Deliberately not free and NULL pool->exited. That will be
- * re-used by realloc later.
- */
-}
-
-/*
- * Fetch a finished job number from the signal pipe
- */
-
-static int threadpool_finished_jobs(struct threadpool *pool, int *jobids,
- unsigned num_jobids)
-{
- ssize_t to_read, nread;
-
- nread = -1;
- errno = EINTR;
-
- to_read = sizeof(int) * num_jobids;
-
- while ((nread == -1) && (errno == EINTR)) {
- nread = read(pool->sig_pipe[0], jobids, to_read);
- }
- if (nread == -1) {
- return -errno;
- }
- if ((nread % sizeof(int)) != 0) {
- return -EINVAL;
- }
- return nread / sizeof(int);
-}
-
-/*
- * Destroy a thread pool, finishing all threads working for it
- */
-
-static int threadpool_destroy(struct threadpool *pool)
-{
- int ret, ret1;
-
- ret = pthread_mutex_lock(&pool->mutex);
- if (ret != 0) {
- return ret;
- }
-
- if ((pool->jobs != NULL) || pool->shutdown) {
- ret = pthread_mutex_unlock(&pool->mutex);
- assert(ret == 0);
- return EBUSY;
- }
-
- if (pool->num_threads > 0) {
- /*
- * We have active threads, tell them to finish, wait for that.
- */
-
- pool->shutdown = 1;
-
- if (pool->num_idle > 0) {
- /*
- * Wake the idle threads. They will find
- * pool->shutdown to be set and exit themselves
- */
- ret = pthread_cond_broadcast(&pool->condvar);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
-
- while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
- if (pool->num_exited > 0) {
- threadpool_join_children(pool);
- continue;
- }
- /*
- * A thread that shuts down will also signal
- * pool->condvar
- */
- ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
- if (ret != 0) {
- pthread_mutex_unlock(&pool->mutex);
- return ret;
- }
- }
- }
-
- ret = pthread_mutex_unlock(&pool->mutex);
- if (ret != 0) {
- return ret;
- }
- ret = pthread_mutex_destroy(&pool->mutex);
- ret1 = pthread_cond_destroy(&pool->condvar);
-
- if (ret != 0) {
- return ret;
- }
- if (ret1 != 0) {
- return ret1;
- }
-
- ret = pthread_mutex_lock(&threadpools_mutex);
- if (ret != 0) {
- return ret;
- }
- DLIST_REMOVE(threadpools, pool);
- ret = pthread_mutex_unlock(&threadpools_mutex);
- assert(ret == 0);
-
- close(pool->sig_pipe[0]);
- pool->sig_pipe[0] = -1;
-
- close(pool->sig_pipe[1]);
- pool->sig_pipe[1] = -1;
-
- free(pool->exited);
- TALLOC_FREE(pool);
-
- return 0;
-}
-
-/*
- * Prepare for pthread_exit(), pool->mutex must be locked
- */
-static void threadpool_server_exit(struct threadpool *pool)
-{
- pthread_t *exited;
-
- pool->num_threads--;
-
- exited = (pthread_t *)realloc(
- pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
-
- if (exited == NULL) {
- /* lost a thread status */
- return;
- }
- pool->exited = exited;
-
- pool->exited[pool->num_exited] = pthread_self();
- pool->num_exited++;
-}
-
-static struct threadpool_job *threadpool_get_job(struct threadpool *p)
-{
- struct threadpool_job *job = p->jobs;
-
- if (job) {
- DLIST_REMOVE(p->jobs, job);
- }
- return job;
-}
-
-static struct threadpool_job *threadpool_put_job(TALLOC_CTX *mem_ctx,
- struct threadpool *p,
- int id,
- void (*fn)(void *private_data),
- void *private_data)
-{
- struct threadpool_job *job;
-
- job = talloc_zero(mem_ctx, struct threadpool_job);
- if (job == NULL) {
- return NULL;
- }
-
- job->id = id;
- job->fn = fn;
- job->private_data = private_data;
- DLIST_ADD_END(p->jobs, job, struct threadpool_job *);
-
- return job;
-}
-
-static void *threadpool_server(void *arg)
-{
- struct threadpool *pool = (struct threadpool *)arg;
- int res;
-
- res = pthread_mutex_lock(&pool->mutex);
- if (res != 0) {
- return NULL;
- }
-
- while (1) {
- struct timespec ts;
- struct threadpool_job *job;
-
- /*
- * idle-wait at most 1 second. If nothing happens in that
- * time, exit this thread.
- */
-
- clock_gettime(CLOCK_MONOTONIC, &ts);
- ts.tv_sec += 1;
-
- while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
-
- pool->num_idle++;
- res = pthread_cond_timedwait(
- &pool->condvar, &pool->mutex, &ts);
- pool->num_idle--;
-
- if (res == ETIMEDOUT) {
- if (pool->jobs == NULL) {
- /*
- * we timed out and still no work for
- * us. Exit.
- */
- threadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
-
- break;
- }
- assert(res == 0);
- }
-
- job = threadpool_get_job(pool);
- if (job != NULL) {
- ssize_t written;
- int sig_pipe = pool->sig_pipe[1];
-
- printf("threadpool_server: pulled job %d\n", job->id);
- /*
- * Do the work with the mutex unlocked
- */
-
- res = pthread_mutex_unlock(&pool->mutex);
- assert(res == 0);
-
- if (likely(!job->cancel)) {
- printf("threadpool_server: executing job %d\n", job->id);
- job->fn(job->private_data);
- } else {
- printf("threadpool_server: skipping cancelled job %d\n", job->id);
- }
-
- /*
- * Do the write without the lock, otherwise we
- * can deadlock with the scheduler
- */
- written = write(sig_pipe, &job->id, sizeof(job->id));
-
- res = pthread_mutex_lock(&pool->mutex);
- assert(res == 0);
-
- if (written != sizeof(int)) {
- threadpool_server_exit(pool);
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
- }
-
- if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
- /*
- * No more work to do and we're asked to shut down, so
- * exit
- */
- threadpool_server_exit(pool);
-
- if (pool->num_threads == 0) {
- /*
- * Ping the main thread waiting for all of us
- * workers to have quit.
- */
- pthread_cond_broadcast(&pool->condvar);
- }
-
- pthread_mutex_unlock(&pool->mutex);
- return NULL;
- }
- }
-}
-
-static struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
- struct threadpool *pool,
- int job_id,
- void (*fn)(void *private_data),
- void *private_data)
-{
- pthread_t thread_id;
- int res;
- sigset_t mask, omask;
- struct threadpool_job *job;
-
- res = pthread_mutex_lock(&pool->mutex);
- assert(res == 0);
-
- if (pool->shutdown) {
- /*
- * Protect against the pool being shut down while
- * trying to add a job
- */
- printf("threadpool_add_job: pool is shutting down\n");
- res = pthread_mutex_unlock(&pool->mutex);
- assert(res == 0);
- return NULL;
- }
-
- /*
- * Just some cleanup under the mutex
- */
- threadpool_join_children(pool);
-
- /*
- * Add job to the end of the queue
- */
- job = threadpool_put_job(mem_ctx, pool, job_id, fn, private_data);
- if (job == NULL) {
- printf("threadpool_add_job: threadpool_put_job failed\n");
- pthread_mutex_unlock(&pool->mutex);
- errno = ENOMEM;
- return NULL;
- }
-
- if (pool->num_idle > 0) {
- /*
- * We have idle threads, wake one.
- */
- res = pthread_cond_signal(&pool->condvar);
- pthread_mutex_unlock(&pool->mutex);
- return job;
- }
-
- if ((pool->max_threads != 0) &&
- (pool->num_threads >= pool->max_threads)) {
- /*
- * No more new threads, we just queue the request
- */
- pthread_mutex_unlock(&pool->mutex);
- return job;
- }
-
- /*
- * Create a new worker thread. It should not receive any signals.
- */
-
- sigfillset(&mask);
-
- res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
- assert(res == 0);
-
- res = pthread_create(&thread_id, NULL, threadpool_server,
- (void *)pool);
- if (res == 0) {
- pool->num_threads += 1;
- }
-
- res = pthread_sigmask(SIG_SETMASK, &omask, NULL);
- assert(res == 0);
-
- pthread_mutex_unlock(&pool->mutex);
- return job;
-}
-
-#endif
-/*******************************************************************
- * tvent_threadpool_send()/recv() sugar
- *******************************************************************/
-
-struct tevent_threadpool_job {
- struct tevent_threadpool_job *prev, *next;
- struct tevent_threadpool *pool;
-
- struct {
- struct tevent_immediate *im;
- bool busy;
- bool done;
- int ret;
- //bool cancel;
- } internal;
-
- struct {
- const struct tevent_threadpool_job_description *desc;
- void *arg;
- } state;
-};
-
-struct tevent_threadpool {
- struct tevent_threadpool *prev, *next;
- struct tevent_context *ev;
-
- size_t max_threads;
- struct tevent_threadpool_job *jobs;
- struct tevent_queue *job_queue;
-
-};
-
-static void tevent_threadpool_handler(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags, void *private_data);
-
-static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
-{
- tevent_queue_stop(pool->job_queue);
-
- //TODO DLIST_REMOVE all pool->jobs
-
-#if 0
- printf("tevent_threadpool_destructor\n");
- while (talloc_array_length(pool->pending) != 0) {
- /* No TALLOC_FREE here */
- talloc_free(pool->pending[0]);
- }
-
- while (pool->num_orphaned != 0) {
- /*
- * We've got jobs in the queue for which the tevent_req has
- * been finished already. Wait for all of them to finish.
- */
- tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
- }
-
- threadpool_destroy(pool->threadpool);
- pool->threadpool = NULL;
-#endif
- return 0;
-}
-
-struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int max_threads,
- const char *location)
-{
- struct tevent_threadpool *pool;
-
- if (ev->wrapper.glue != NULL) {
- /*
- * stacking of wrappers is not supported
- */
- tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
- "%s: %s() stacking not allowed",
- __func__, location);
- errno = ENOSYS;
- return NULL;
- }
-
- pool = talloc_zero(mem_ctx, struct tevent_threadpool);
- if (pool == NULL) {
- return NULL;
- }
- *pool = (struct tevent_threadpool) {
- .ev = ev,
- .max_threads = max_threads,
- };
-
- pool->job_queue = tevent_queue_create(pool, "job_queue");
- if (pool->job_queue == NULL) {
- TALLOC_FREE(pool);
- return NULL;
- }
-
- DLIST_ADD_END(ev->threads.pools, pool);
-
- return pool;
-}
-#if 0
-static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
-{
- int num_pending = talloc_array_length(pool->pending);
- int result;
-
- while (true) {
- int i;
-
- result = pool->next_job_id++;
- if (result == 0) {
- continue;
- }
-
- for (i = 0; i < num_pending; i++) {
- struct tevent_threadpool_state *state = tevent_req_data(
- pool->pending[i], struct tevent_threadpool_state);
-
- if (result == state->job_id) {
- break;
- }
- }
- if (i == num_pending) {
- return result;
- }
- }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state);
-
-static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
- struct tevent_threadpool *pool,
- struct tevent_context *ev)
-{
- struct tevent_req **pending;
- int num_pending, orphaned_array_length;
-
- num_pending = talloc_array_length(pool->pending);
- printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
- num_pending);
- pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
- num_pending + 1);
- if (pending == NULL) {
- return false;
- }
- pending[num_pending] = req;
- num_pending++;
- pool->pending = pending;
- tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
-
- /*
- * Make sure that the orphaned array of
- * tevent_threadpool_state structs has enough space. A job can
- * change from pending to orphaned in
- * tevent_threadpool_cleanup, and to fail in a talloc
- * destructor should be avoided if possible.
- */
-
- orphaned_array_length = talloc_array_length(pool->orphaned);
- if (num_pending > orphaned_array_length) {
- struct tevent_threadpool_state **orphaned;
-
- orphaned = talloc_realloc(pool, pool->orphaned,
- struct tevent_threadpool_state *,
- orphaned_array_length + 1);
- if (orphaned == NULL) {
- tevent_threadpool_req_unset_pending(req);
- return false;
- }
- pool->orphaned = orphaned;
- }
-
- if (pool->fde != NULL) {
- return true;
- }
-
- pool->fde = tevent_add_fd(pool->ev,
- pool,
- pool->sig_fd, TEVENT_FD_READ,
- tevent_threadpool_handler, pool);
- if (pool->fde == NULL) {
- tevent_threadpool_req_unset_pending(req);
- return false;
- }
- return true;
-}
-
-static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
-{
- struct tevent_threadpool *pool = state->pool;
- int num_pending = talloc_array_length(pool->pending);
-
- if (num_pending == 0 && pool->num_orphaned == 0) {
- printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
- TALLOC_FREE(pool->fde);
- }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
-{
- struct tevent_threadpool_state *state = tevent_req_data(
- req, struct tevent_threadpool_state);
- struct tevent_threadpool *pool = state->pool;
- int num_pending = talloc_array_length(pool->pending);
- int i;
-
- tevent_req_set_cleanup_fn(req, NULL);
-
- if (num_pending == 1) {
- if (pool->num_orphaned == 0) {
- printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
- TALLOC_FREE(pool->fde);
- }
- TALLOC_FREE(pool->pending);
- return;
- }
-
- for (i = 0; i < num_pending; i++) {
- if (req == pool->pending[i]) {
- break;
- }
- }
- if (i == num_pending) {
- return;
- }
- if (num_pending > 1) {
- pool->pending[i] = pool->pending[num_pending - 1];
- }
- pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
- num_pending - 1);
-}
-#endif
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state)
-{
- struct tevent_threadpool_state *state = tevent_req_data(
- req, struct tevent_threadpool_state);
- //struct tevent_threadpool *pool = state->pool;
-
- //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
-
- switch (req_state) {
- case TEVENT_REQ_RECEIVED:
- break;
- default:
- // printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
- return;
- }
-
- if (state->done) {
- // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
- tevent_threadpool_req_unset_pending(req);
- return;
- }
-
- //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-#if 0
- threadpool_job_cancel(state->job);
-
- /*
- * Keep around the state of the deleted request until the
- * request has finished in the helper
- * thread. tevent_threadpool_handler will destroy it.
- */
- pool->num_orphaned++;
- tevent_threadpool_req_unset_pending(req);
- pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
- &req->data);
-#endif
-}
-
-static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
-{
- if (state->pool == NULL) {
- return 0;
- }
-
- DLIST_REMOVE(job->pool, job);
- job->pool = NULL;
- return 0;
-}
-
-struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
- const struct tevent_threadpool_job_description *desc,
- void *pstate,
- const char *location)
-{
- struct tevent_threadpool_job *job;
- void **ppstate = (void **)pstate;
- void *state;
- size_t payload;
-
- payload = sizeof(struct tevent_immediate) + desc->type_size;
- if (payload < sizeof(struct tevent_immediate)) {
- /* overflow */
- return NULL;
- }
-
- job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
- 2, payload);
- if (job == NULL) {
- return NULL;
- }
-
- *job = (struct tevent_threadpool_job) {
- .internal = {
- .im = tevent_create_immediate(job),
- },
- .state = {
- .desc = desc,
- .arg = talloc_zero_size(job, desc->type_size),
- },
- };
- talloc_set_name_const(job->state.arg, desc->type);
-
- talloc_set_destructor(job, tevent_threadpool_job_destructor);
-
- *ppstate = state;
- return job;
-}
-
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
- void *pstate,
- size_t state_size,
- const char *type,
- const char *location)
-{
- return NULL;
-}
-
-struct metze_thread_args {
- uint8_t tmp;
-};
-
-static int metze_thread_fn(struct metze_thread_state *state)
-{
- return 0;
-}
-
-TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
-
-struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct tevent_threadpool *pool,
- struct tevent_threadpool_job *job)
-{
- struct tevent_req *req;
- struct tevent_threadpool_state *state;
-#if 0
-struct tevent_threadpool_job *j;
-struct metze_thread_state *jstate;
-
- j = tevent_threadpool_job_create(mem_ctx, metze_thread_fn,
- &jstate, struct metze_thread_state);
- if (j == NULL) {
-
- }
-#endif
- req = tevent_req_create(mem_ctx, &state, struct tevent_threadpool_state);
- if (req == NULL) {
- return NULL;
- }
- state->pool = pool;
- state->job_id = tevent_threadpool_next_job_id(state->pool);
- state->done = false;
- state->job = job;
-
- printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
-
-
- state->threadpool_job = threadpool_add_job(state,
- state->pool->threadpool,
- state->job_id,
- state->job->fn,
- state->job->state);
- if (state->threadpool_job == NULL) {
- printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
- tevent_req_error(req, errno);
- return tevent_req_post(req, pool->ev);
- }
- if (!tevent_threadpool_req_set_pending(req, pool, pool->ev)) {
- printf("tevent_threadpool_send: job %d tevent_threadpool_req_set_pending failed\n", state->job_id);
- tevent_req_oom(req);
- return tevent_req_post(req, pool->ev);
- }
- return req;
-}
-
-static void tevent_threadpool_handler(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags,
- void *private_data)
-{
- struct tevent_threadpool *pool = talloc_get_type_abort(
- private_data, struct tevent_threadpool);
- struct tevent_threadpool_state *orphaned_state;
- int i, num_pending;
- int job_id;
-
- if (threadpool_finished_jobs(pool->threadpool, &job_id, 1) < 0) {
- return;
- }
-
- printf("tevent_threadpool_handler: %d finished (num orphaned: %d)\n",
- job_id, pool->num_orphaned);
-
- num_pending = talloc_array_length(pool->pending);
-
- for (i=0; i<num_pending; i++) {
- struct tevent_threadpool_state *state = tevent_req_data(
- pool->pending[i], struct tevent_threadpool_state);
-
- printf("tevent_threadpool_handler: %d pending\n", state->job_id);
- if (job_id == state->job_id) {
- state->done = true;
- if (threadpool_job_is_cancelled(state->job)) {
- printf("tevent_threadpool_handler: %d cancelled\n", job_id);
- tevent_req_error(pool->pending[i], EINTR);
- tevent_req_post(pool->pending[i], ev);
- return;
- }
- printf("tevent_threadpool_handler: %d done\n", job_id);
- tevent_req_done(pool->pending[i]);
- return;
- }
- }
-
- printf("tevent_threadpool_handler: num orphaned: %d\n", pool->num_orphaned);
-
- for (i = 0; i < pool->num_orphaned; i++) {
- if (job_id == pool->orphaned[i]->job_id) {
- break;
- }
- }
- if (i == pool->num_orphaned) {
- printf("tevent_threadpool_handler: done\n");
- return;
- }
-
- printf("tevent_threadpool_handler: freeing orphaned: %d\n", pool->num_orphaned);
-
- orphaned_state = pool->orphaned[i];
- if (i < pool->num_orphaned - 1) {
- pool->orphaned[i] = pool->orphaned[pool->num_orphaned - 1];
- }
- pool->num_orphaned--;
- tevent_threadpool_state_unset_orphaned(orphaned_state);
- TALLOC_FREE(orphaned_state);
-}
-
-int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
-{
- enum tevent_req_state req_state;
- uint64_t error;
- struct tevent_threadpool_state *state = tevent_req_data(
- req, struct tevent_threadpool_state);
-
- if (!tevent_req_is_error(req, &req_state, &error)) {
- printf("tevent_threadpool_recv: %d done\n", state->job_id);
- tevent_req_received(req);
- return 0;
- }
-
- printf("tevent_threadpool_recv: %d failed\n", state->job_id);
-
- switch (req_state) {
- case TEVENT_REQ_NO_MEMORY:
- *perrno = ENOMEM;
- break;
- case TEVENT_REQ_TIMED_OUT:
- *perrno = ETIMEDOUT;
- break;
- case TEVENT_REQ_USER_ERROR:
- *perrno = (int)error;
- break;
- default:
- *perrno = EIO;
- break;
- }
-
- tevent_req_received(req);
- return -1;
-}
-
-#else
-/* !HAVE_PTHREAD */
-struct tevent_thread_proxy *tevent_thread_proxy_create(
- struct tevent_context *dest_ev_ctx)
-{
- errno = ENOSYS;
- return NULL;
-}
-
-void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
- struct tevent_immediate **pp_im,
- tevent_immediate_handler_t handler,
- void *pp_private_data)
-{
- ;
-}
-
-struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int max_threads)
-{
- return NULL;
-}
-
-struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
- void (*fn)(void *private_data),
- void *private_data)
-{
- return NULL;
-}
-
-bool tevent_threadpool_recv(struct tevent_req *req)
-{
- return false;
-}
#endif
static int tevent_threaded_context_destructor(