WIP: job API
[metze/samba/wip.git] / lib / tevent / tevent_threads.c
index 22b854c410f82d209f333ffbfd27aea8590a3671..28571311247898aa467212aeb650fb20e42d5d6f 100644 (file)
 #include "tevent.h"
 #include "tevent_internal.h"
 #include "tevent_util.h"
+#include "lib/util/dlinklist.h"
 
-#if defined(HAVE_PTHREAD)
-#include <pthread.h>
+#ifdef HAVE_PTHREAD
+#include "system/threads.h"
 
 struct tevent_immediate_list {
        struct tevent_immediate_list *next, *prev;
@@ -217,6 +218,18 @@ struct tevent_thread_proxy *tevent_thread_proxy_create(
        int pipefds[2];
        struct tevent_thread_proxy *tp;
 
+       if (dest_ev_ctx->wrapper.glue != NULL) {
+               /*
+                * stacking of wrappers is not supported
+                */
+               tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
+                            TEVENT_DEBUG_FATAL,
+                            "%s() not allowed on a wrapper context\n",
+                            __func__);
+               errno = EINVAL;
+               return NULL;
+       }
+
        tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
        if (tp == NULL) {
                return NULL;
@@ -354,20 +367,1265 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
                /* Notreached. */
        }
 }
-#else
-/* !HAVE_PTHREAD */
-struct tevent_thread_proxy *tevent_thread_proxy_create(
-               struct tevent_context *dest_ev_ctx)
+
+/*********************************************************************
+ * Raw threadpool implementation
+ *********************************************************************/
+
+struct threadpool_job {
+       /* List of jobs */
+       struct threadpool_job *prev, *next;
+
+       int id;
+       void (*fn)(void *private_data);
+       void *private_data;
+       bool cancel;
+};
+
+struct threadpool {
+       /*
+        * List threadpools for fork safety
+        */
+       struct threadpool *prev, *next;
+
+       /*
+        * Control access to this struct
+        */
+       pthread_mutex_t mutex;
+
+       /*
+        * Threads waiting for work do so here
+        */
+       pthread_cond_t condvar;
+
+       /*
+        * List of jobs
+        */
+       struct threadpool_job *jobs;
+
+       /*
+        * pipe for signalling
+        */
+       int sig_pipe[2];
+
+       /*
+        * indicator to worker threads that they should shut down
+        */
+       int shutdown;
+
+       /*
+        * maximum number of threads
+        */
+       int max_threads;
+
+       /*
+        * Number of threads
+        */
+       int num_threads;
+
+       /*
+        * Number of idle threads
+        */
+       int num_idle;
+
+       /*
+        * An array of threads that require joining.
+        */
+       int                     num_exited;
+       pthread_t               *exited; /* We alloc more */
+};
+
+static pthread_mutex_t threadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct threadpool *threadpools = NULL;
+static pthread_once_t threadpool_atfork_initialized = PTHREAD_ONCE_INIT;
+
+static void threadpool_prep_atfork(void);
+
+/*
+ * Initialize a thread pool
+ */
+
+static struct threadpool *threadpool_init(TALLOC_CTX *mem_ctx,
+                                         unsigned max_threads)
 {
-       errno = ENOSYS;
-       return NULL;
+       struct threadpool *pool;
+       int ret;
+
+       pool = talloc_zero(mem_ctx, struct threadpool);
+       if (pool == NULL) {
+               return NULL;
+       }
+
+       ret = pipe(pool->sig_pipe);
+       if (ret == -1) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       ret = pthread_mutex_init(&pool->mutex, NULL);
+       if (ret != 0) {
+               close(pool->sig_pipe[0]);
+               close(pool->sig_pipe[1]);
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       ret = pthread_cond_init(&pool->condvar, NULL);
+       if (ret != 0) {
+               pthread_mutex_destroy(&pool->mutex);
+               close(pool->sig_pipe[0]);
+               close(pool->sig_pipe[1]);
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       pool->shutdown = 0;
+       pool->num_threads = 0;
+       pool->num_exited = 0;
+       pool->exited = NULL;
+       pool->max_threads = max_threads;
+       pool->num_idle = 0;
+
+       ret = pthread_mutex_lock(&threadpools_mutex);
+       if (ret != 0) {
+               pthread_cond_destroy(&pool->condvar);
+               pthread_mutex_destroy(&pool->mutex);
+               close(pool->sig_pipe[0]);
+               close(pool->sig_pipe[1]);
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+       DLIST_ADD(threadpools, pool);
+
+       ret = pthread_mutex_unlock(&threadpools_mutex);
+       assert(ret == 0);
+
+       pthread_once(&threadpool_atfork_initialized, threadpool_prep_atfork);
+
+       return pool;
 }
 
-void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
-                                 struct tevent_immediate **pp_im,
-                                 tevent_immediate_handler_t handler,
-                                 void *pp_private_data)
+static void threadpool_prepare(void)
 {
-       ;
+       int ret;
+       struct threadpool *pool;
+
+       ret = pthread_mutex_lock(&threadpools_mutex);
+       assert(ret == 0);
+
+       pool = threadpools;
+
+       while (pool != NULL) {
+               ret = pthread_mutex_lock(&pool->mutex);
+               assert(ret == 0);
+               pool = pool->next;
+       }
+}
+
+static void threadpool_parent(void)
+{
+       int ret;
+       struct threadpool *pool;
+
+       for (pool = DLIST_TAIL(threadpools);
+            pool != NULL;
+            pool = DLIST_PREV(pool)) {
+               ret = pthread_mutex_unlock(&pool->mutex);
+               assert(ret == 0);
+       }
+
+       ret = pthread_mutex_unlock(&threadpools_mutex);
+       assert(ret == 0);
+}
+
+static void threadpool_child(void)
+{
+       int ret;
+       struct threadpool *pool;
+
+       for (pool = DLIST_TAIL(threadpools);
+            pool != NULL;
+            pool = DLIST_PREV(pool)) {
+
+               close(pool->sig_pipe[0]);
+               close(pool->sig_pipe[1]);
+
+               ret = pipe(pool->sig_pipe);
+               assert(ret == 0);
+
+               pool->num_threads = 0;
+
+               pool->num_exited = 0;
+               free(pool->exited);
+               pool->exited = NULL;
+
+               pool->num_idle = 0;
+
+               ret = pthread_mutex_unlock(&pool->mutex);
+               assert(ret == 0);
+       }
+
+       ret = pthread_mutex_unlock(&threadpools_mutex);
+       assert(ret == 0);
+}
+
+static void threadpool_prep_atfork(void)
+{
+       pthread_atfork(threadpool_prepare, threadpool_parent,
+                      threadpool_child);
+}
+
+/*
+ * Return the file descriptor which becomes readable when a job has
+ * finished
+ */
+
+static int threadpool_signal_fd(struct threadpool *pool)
+{
+       return pool->sig_pipe[0];
+}
+
+static void threadpool_job_cancel(struct threadpool_job *job)
+{
+       job->cancel = true;
+}
+
+static bool threadpool_job_is_cancelled(struct threadpool_job *job)
+{
+       return job->cancel;
+}
+
+/*
+ * Do a pthread_join() on all children that have exited, pool->mutex must be
+ * locked
+ */
+static void threadpool_join_children(struct threadpool *pool)
+{
+       int i;
+
+       for (i=0; i<pool->num_exited; i++) {
+               pthread_join(pool->exited[i], NULL);
+       }
+       pool->num_exited = 0;
+
+       /*
+        * Deliberately not free and NULL pool->exited. That will be
+        * re-used by realloc later.
+        */
+}
+
+/*
+ * Fetch a finished job number from the signal pipe
+ */
+
+static int threadpool_finished_jobs(struct threadpool *pool, int *jobids,
+                                   unsigned num_jobids)
+{
+       ssize_t to_read, nread;
+
+       nread = -1;
+       errno = EINTR;
+
+       to_read = sizeof(int) * num_jobids;
+
+       while ((nread == -1) && (errno == EINTR)) {
+               nread = read(pool->sig_pipe[0], jobids, to_read);
+       }
+       if (nread == -1) {
+               return -errno;
+       }
+       if ((nread % sizeof(int)) != 0) {
+               return -EINVAL;
+       }
+       return nread / sizeof(int);
+}
+
+/*
+ * Destroy a thread pool, finishing all threads working for it
+ */
+
+static int threadpool_destroy(struct threadpool *pool)
+{
+       int ret, ret1;
+
+       ret = pthread_mutex_lock(&pool->mutex);
+       if (ret != 0) {
+               return ret;
+       }
+
+       if ((pool->jobs != NULL) || pool->shutdown) {
+               ret = pthread_mutex_unlock(&pool->mutex);
+               assert(ret == 0);
+               return EBUSY;
+       }
+
+       if (pool->num_threads > 0) {
+               /*
+                * We have active threads, tell them to finish, wait for that.
+                */
+
+               pool->shutdown = 1;
+
+               if (pool->num_idle > 0) {
+                       /*
+                        * Wake the idle threads. They will find
+                        * pool->shutdown to be set and exit themselves
+                        */
+                       ret = pthread_cond_broadcast(&pool->condvar);
+                       if (ret != 0) {
+                               pthread_mutex_unlock(&pool->mutex);
+                               return ret;
+                       }
+               }
+
+               while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
+
+                       if (pool->num_exited > 0) {
+                               threadpool_join_children(pool);
+                               continue;
+                       }
+                       /*
+                        * A thread that shuts down will also signal
+                        * pool->condvar
+                        */
+                       ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
+                       if (ret != 0) {
+                               pthread_mutex_unlock(&pool->mutex);
+                               return ret;
+                       }
+               }
+       }
+
+       ret = pthread_mutex_unlock(&pool->mutex);
+       if (ret != 0) {
+               return ret;
+       }
+       ret = pthread_mutex_destroy(&pool->mutex);
+       ret1 = pthread_cond_destroy(&pool->condvar);
+
+       if (ret != 0) {
+               return ret;
+       }
+       if (ret1 != 0) {
+               return ret1;
+       }
+
+       ret = pthread_mutex_lock(&threadpools_mutex);
+       if (ret != 0) {
+               return ret;
+       }
+       DLIST_REMOVE(threadpools, pool);
+       ret = pthread_mutex_unlock(&threadpools_mutex);
+       assert(ret == 0);
+
+       close(pool->sig_pipe[0]);
+       pool->sig_pipe[0] = -1;
+
+       close(pool->sig_pipe[1]);
+       pool->sig_pipe[1] = -1;
+
+       free(pool->exited);
+       TALLOC_FREE(pool);
+
+       return 0;
+}
+
+/*
+ * Prepare for pthread_exit(), pool->mutex must be locked
+ */
+static void threadpool_server_exit(struct threadpool *pool)
+{
+       pthread_t *exited;
+
+       pool->num_threads--;
+
+       exited = (pthread_t *)realloc(
+               pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
+
+       if (exited == NULL) {
+               /* lost a thread status */
+               return;
+       }
+       pool->exited = exited;
+
+       pool->exited[pool->num_exited] = pthread_self();
+       pool->num_exited++;
+}
+
+static struct threadpool_job *threadpool_get_job(struct threadpool *p)
+{
+       struct threadpool_job *job = p->jobs;
+
+       if (job) {
+               DLIST_REMOVE(p->jobs, job);
+       }
+       return job;
+}
+
+static struct threadpool_job *threadpool_put_job(TALLOC_CTX *mem_ctx,
+                                                struct threadpool *p,
+                                                int id,
+                                                void (*fn)(void *private_data),
+                                                void *private_data)
+{
+       struct threadpool_job *job;
+
+       job = talloc_zero(mem_ctx, struct threadpool_job);
+       if (job == NULL) {
+               return NULL;
+       }
+
+       job->id = id;
+       job->fn = fn;
+       job->private_data = private_data;
+       DLIST_ADD_END(p->jobs, job, struct threadpool_job *);
+
+       return job;
+}
+
+static void *threadpool_server(void *arg)
+{
+       struct threadpool *pool = (struct threadpool *)arg;
+       int res;
+
+       res = pthread_mutex_lock(&pool->mutex);
+       if (res != 0) {
+               return NULL;
+       }
+
+       while (1) {
+               struct timespec ts;
+               struct threadpool_job *job;
+
+               /*
+                * idle-wait at most 1 second. If nothing happens in that
+                * time, exit this thread.
+                */
+
+               clock_gettime(CLOCK_MONOTONIC, &ts);
+               ts.tv_sec += 1;
+
+               while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
+
+                       pool->num_idle++;
+                       res = pthread_cond_timedwait(
+                               &pool->condvar, &pool->mutex, &ts);
+                       pool->num_idle--;
+
+                       if (res == ETIMEDOUT) {
+                               if (pool->jobs == NULL) {
+                                       /*
+                                        * we timed out and still no work for
+                                        * us. Exit.
+                                        */
+                                       threadpool_server_exit(pool);
+                                       pthread_mutex_unlock(&pool->mutex);
+                                       return NULL;
+                               }
+
+                               break;
+                       }
+                       assert(res == 0);
+               }
+
+               job = threadpool_get_job(pool);
+               if (job != NULL) {
+                       ssize_t written;
+                       int sig_pipe = pool->sig_pipe[1];
+
+                       printf("threadpool_server: pulled job %d\n", job->id);
+                       /*
+                        * Do the work with the mutex unlocked
+                        */
+
+                       res = pthread_mutex_unlock(&pool->mutex);
+                       assert(res == 0);
+
+                       if (likely(!job->cancel)) {
+                               printf("threadpool_server: executing job %d\n", job->id);
+                               job->fn(job->private_data);
+                       } else {
+                               printf("threadpool_server: skipping cancelled job %d\n", job->id);
+                       }
+
+                       /*
+                        * Do the write without the lock, otherwise we
+                        * can deadlock with the scheduler
+                        */
+                       written = write(sig_pipe, &job->id, sizeof(job->id));
+
+                       res = pthread_mutex_lock(&pool->mutex);
+                       assert(res == 0);
+
+                       if (written != sizeof(int)) {
+                               threadpool_server_exit(pool);
+                               pthread_mutex_unlock(&pool->mutex);
+                               return NULL;
+                       }
+               }
+
+               if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
+                       /*
+                        * No more work to do and we're asked to shut down, so
+                        * exit
+                        */
+                       threadpool_server_exit(pool);
+
+                       if (pool->num_threads == 0) {
+                               /*
+                                * Ping the main thread waiting for all of us
+                                * workers to have quit.
+                                */
+                               pthread_cond_broadcast(&pool->condvar);
+                       }
+
+                       pthread_mutex_unlock(&pool->mutex);
+                       return NULL;
+               }
+       }
+}
+
+static struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
+                                                struct threadpool *pool,
+                                                int job_id,
+                                                void (*fn)(void *private_data),
+                                                void *private_data)
+{
+       pthread_t thread_id;
+       int res;
+       sigset_t mask, omask;
+       struct threadpool_job *job;
+
+       res = pthread_mutex_lock(&pool->mutex);
+       assert(res == 0);
+
+       if (pool->shutdown) {
+               /*
+                * Protect against the pool being shut down while
+                * trying to add a job
+                */
+               printf("threadpool_add_job: pool is shutting down\n");
+               res = pthread_mutex_unlock(&pool->mutex);
+               assert(res == 0);
+               return NULL;
+       }
+
+       /*
+        * Just some cleanup under the mutex
+        */
+       threadpool_join_children(pool);
+
+       /*
+        * Add job to the end of the queue
+        */
+       job = threadpool_put_job(mem_ctx, pool, job_id, fn, private_data);
+       if (job == NULL) {
+               printf("threadpool_add_job: threadpool_put_job failed\n");
+               pthread_mutex_unlock(&pool->mutex);
+               errno = ENOMEM;
+               return NULL;
+       }
+
+       if (pool->num_idle > 0) {
+               /*
+                * We have idle threads, wake one.
+                */
+               res = pthread_cond_signal(&pool->condvar);
+               pthread_mutex_unlock(&pool->mutex);
+               return job;
+       }
+
+       if ((pool->max_threads != 0) &&
+           (pool->num_threads >= pool->max_threads)) {
+               /*
+                * No more new threads, we just queue the request
+                */
+               pthread_mutex_unlock(&pool->mutex);
+               return job;
+       }
+
+       /*
+        * Create a new worker thread. It should not receive any signals.
+        */
+
+       sigfillset(&mask);
+
+        res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
+       assert(res == 0);
+
+       res = pthread_create(&thread_id, NULL, threadpool_server,
+                            (void *)pool);
+       if (res == 0) {
+               pool->num_threads += 1;
+       }
+
+        res = pthread_sigmask(SIG_SETMASK, &omask, NULL);
+       assert(res == 0);
+
+       pthread_mutex_unlock(&pool->mutex);
+       return job;
+}
+
+/*******************************************************************
+ * tvent_threadpool_send()/recv() sugar
+ *******************************************************************/
+
+struct tevent_threadpool_job {
+       tevent_threadpool_fn_t fn;
+       void *private_data;
+};
+
+struct tevent_threadpool_state {
+       struct tevent_threadpool *pool;
+       int job_id;
+       bool done;
+       struct tevent_threadpool_job *job;
+       struct threadpool_job *threadpool_job;
+};
+
+struct tevent_threadpool {
+       struct tevent_context *ev;
+       struct threadpool *threadpool;
+       int next_job_id;
+       int sig_fd;
+       struct tevent_req **pending;
+
+       struct tevent_threadpool_state **orphaned;
+       int num_orphaned;
+
+       struct tevent_fd *fde;
+};
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+                                     struct tevent_fd *fde,
+                                     uint16_t flags, void *private_data);
+
+static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
+{
+       printf("tevent_threadpool_destructor\n");
+       while (talloc_array_length(pool->pending) != 0) {
+               /* No TALLOC_FREE here */
+               talloc_free(pool->pending[0]);
+       }
+
+       while (pool->num_orphaned != 0) {
+               /*
+                * We've got jobs in the queue for which the tevent_req has
+                * been finished already. Wait for all of them to finish.
+                */
+               tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
+       }
+
+       threadpool_destroy(pool->threadpool);
+       pool->threadpool = NULL;
+
+       return 0;
+}
+
+struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+                                                  struct tevent_context *ev,
+                                                  int max_threads)
+{
+       struct tevent_threadpool *pool;
+
+       pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+       if (pool == NULL) {
+               return NULL;
+       }
+       pool->ev = ev;
+
+       pool->threadpool = threadpool_init(pool, max_threads);
+       if (pool->threadpool == NULL) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+       talloc_set_destructor(pool, tevent_threadpool_destructor);
+
+       pool->sig_fd = threadpool_signal_fd(pool->threadpool);
+       if (pool->sig_fd == -1) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       return pool;
+}
+
+static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
+{
+       int num_pending = talloc_array_length(pool->pending);
+       int result;
+
+       while (true) {
+               int i;
+
+               result = pool->next_job_id++;
+               if (result == 0) {
+                       continue;
+               }
+
+               for (i = 0; i < num_pending; i++) {
+                       struct tevent_threadpool_state *state = tevent_req_data(
+                               pool->pending[i], struct tevent_threadpool_state);
+
+                       if (result == state->job_id) {
+                               break;
+                       }
+               }
+               if (i == num_pending) {
+                       return result;
+               }
+       }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+                                         enum tevent_req_state req_state);
+
+static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
+                                             struct tevent_threadpool *pool,
+                                             struct tevent_context *ev)
+{
+       struct tevent_req **pending;
+       int num_pending, orphaned_array_length;
+
+       num_pending = talloc_array_length(pool->pending);
+       printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
+               num_pending);
+       pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
+                                num_pending + 1);
+       if (pending == NULL) {
+               return false;
+       }
+       pending[num_pending] = req;
+       num_pending++;
+       pool->pending = pending;
+       tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
+
+       /*
+        * Make sure that the orphaned array of
+        * tevent_threadpool_state structs has enough space. A job can
+        * change from pending to orphaned in
+        * tevent_threadpool_cleanup, and to fail in a talloc
+        * destructor should be avoided if possible.
+        */
+
+       orphaned_array_length = talloc_array_length(pool->orphaned);
+       if (num_pending > orphaned_array_length) {
+               struct tevent_threadpool_state **orphaned;
+
+               orphaned = talloc_realloc(pool, pool->orphaned,
+                                         struct tevent_threadpool_state *,
+                                         orphaned_array_length + 1);
+               if (orphaned == NULL) {
+                       tevent_threadpool_req_unset_pending(req);
+                       return false;
+               }
+               pool->orphaned = orphaned;
+       }
+
+       if (pool->fde != NULL) {
+               return true;
+       }
+
+       pool->fde = tevent_add_fd(pool->ev,
+                                 pool,
+                                 pool->sig_fd, TEVENT_FD_READ,
+                                 tevent_threadpool_handler, pool);
+       if (pool->fde == NULL) {
+               tevent_threadpool_req_unset_pending(req);
+               return false;
+       }
+       return true;
+}
+
+static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
+{
+       struct tevent_threadpool *pool = state->pool;
+       int num_pending = talloc_array_length(pool->pending);
+
+       if (num_pending == 0 && pool->num_orphaned == 0) {
+               printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+               TALLOC_FREE(pool->fde);
+       }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
+{
+       struct tevent_threadpool_state *state = tevent_req_data(
+               req, struct tevent_threadpool_state);
+       struct tevent_threadpool *pool = state->pool;
+       int num_pending = talloc_array_length(pool->pending);
+       int i;
+
+       tevent_req_set_cleanup_fn(req, NULL);
+
+       if (num_pending == 1) {
+               if (pool->num_orphaned == 0) {
+                       printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+                       TALLOC_FREE(pool->fde);
+               }
+               TALLOC_FREE(pool->pending);
+               return;
+       }
+
+       for (i = 0; i < num_pending; i++) {
+               if (req == pool->pending[i]) {
+                       break;
+               }
+       }
+       if (i == num_pending) {
+               return;
+       }
+       if (num_pending > 1) {
+               pool->pending[i] = pool->pending[num_pending - 1];
+       }
+       pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
+                                     num_pending - 1);
+}
+
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+                                         enum tevent_req_state req_state)
+{
+       struct tevent_threadpool_state *state = tevent_req_data(
+               req, struct tevent_threadpool_state);
+       struct tevent_threadpool *pool = state->pool;
+
+       printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+
+       switch (req_state) {
+       case TEVENT_REQ_RECEIVED:
+               break;
+       default:
+               printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+               return;
+       }
+
+       if (state->done) {
+               printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+               tevent_threadpool_req_unset_pending(req);
+               return;
+       }
+
+       printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+
+       threadpool_job_cancel(state->job);
+
+       /*
+        * Keep around the state of the deleted request until the
+        * request has finished in the helper
+        * thread. tevent_threadpool_handler will destroy it.
+        */
+       pool->num_orphaned++;
+       tevent_threadpool_req_unset_pending(req);
+       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+                                                            &req->data);
+}
+
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
+                                                           tevent_threadpool_fn_t fn,
+                                                           void *pdata,
+                                                           size_t data_size,
+                                                           const char *type,
+                                                           const char *location)
+{
+       struct tevent_threadpool_job *job;
+       void **ppdata = (void **)pdata;
+       void *data;
+
+       job = talloc_pooled_object(mem_ctx, tevent_threadpool_job, 1,
+                                  data_size);
+       if (job == NULL) {
+               return NULL;
+       }
+       ZERO_STRUCTP(job);
+
+       job->fn = fn;
+
+       data = talloc_zero_size(job, data_size);
+       if (data == NULL) {
+               talloc_free(job);
+               return NULL;
+       }
+       talloc_set_name_const(job, type);
+
+       job->data = data;
+
+       talloc_set_destructor(job, tevent_threaded_job_destructor);
+
+       *ppdata = data;
+       return job;
+}
+
+struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
+                                         struct tevent_threadpool_job *job)
+{
+       struct tevent_req *req;
+       struct tevent_threadpool_state *state;
+
+       req = tevent_req_create(pool, &state, struct tevent_threadpool_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->pool = pool;
+       state->job_id = tevent_threadpool_next_job_id(state->pool);
+       state->done = false;
+       state->job = job;
+
+       printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
+
+       state->threadpool_job = threadpool_add_job(state,
+                                                  state->pool->threadpool,
+                                                  state->job_id,
+                                                  state->job->fn,
+                                                  state->job->data);
+       if (state->threadpool_job == NULL) {
+               printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
+               tevent_req_error(req, errno);
+               return tevent_req_post(req, pool->ev);
+       }
+       if (!tevent_threadpool_req_set_pending(req, pool, pool->ev)) {
+               printf("tevent_threadpool_send: job %d tevent_threadpool_req_set_pending failed\n", state->job_id);
+               tevent_req_oom(req);
+               return tevent_req_post(req, pool->ev);
+       }
+       return req;
+}
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+                                     struct tevent_fd *fde,
+                                     uint16_t flags,
+                                     void *private_data)
+{
+       struct tevent_threadpool *pool = talloc_get_type_abort(
+               private_data, struct tevent_threadpool);
+       struct tevent_threadpool_state *orphaned_state;
+       int i, num_pending;
+       int job_id;
+
+       if (threadpool_finished_jobs(pool->threadpool, &job_id, 1) < 0) {
+               return;
+       }
+
+       printf("tevent_threadpool_handler: %d finished (num orphaned: %d)\n",
+              job_id, pool->num_orphaned);
+
+       num_pending = talloc_array_length(pool->pending);
+
+       for (i=0; i<num_pending; i++) {
+               struct tevent_threadpool_state *state = tevent_req_data(
+                       pool->pending[i], struct tevent_threadpool_state);
+
+               printf("tevent_threadpool_handler: %d pending\n", state->job_id);
+               if (job_id == state->job_id) {
+                       state->done = true;
+                       if (threadpool_job_is_cancelled(state->job)) {
+                               printf("tevent_threadpool_handler: %d cancelled\n", job_id);
+                               tevent_req_error(pool->pending[i], EINTR);
+                               tevent_req_post(pool->pending[i], ev);
+                               return;
+                       }
+                       printf("tevent_threadpool_handler: %d done\n", job_id);
+                       tevent_req_done(pool->pending[i]);
+                       return;
+               }
+       }
+
+       printf("tevent_threadpool_handler: num orphaned: %d\n", pool->num_orphaned);
+
+       for (i = 0; i < pool->num_orphaned; i++) {
+               if (job_id == pool->orphaned[i]->job_id) {
+                       break;
+               }
+       }
+       if (i == pool->num_orphaned) {
+               printf("tevent_threadpool_handler: done\n");
+               return;
+       }
+
+       printf("tevent_threadpool_handler: freeing orphaned: %d\n", pool->num_orphaned);
+
+       orphaned_state = pool->orphaned[i];
+       if (i < pool->num_orphaned - 1) {
+               pool->orphaned[i] = pool->orphaned[pool->num_orphaned - 1];
+       }
+       pool->num_orphaned--;
+       tevent_threadpool_state_unset_orphaned(orphaned_state);
+       TALLOC_FREE(orphaned_state);
+}
+
+int tevent_threadpool_recv(struct tevent_req *req, int *perrno)
+{
+       enum tevent_req_state req_state;
+       uint64_t error;
+       struct tevent_threadpool_state *state = tevent_req_data(
+               req, struct tevent_threadpool_state);
+
+       if (!tevent_req_is_error(req, &req_state, &error)) {
+               printf("tevent_threadpool_recv: %d done\n", state->job_id);
+               tevent_req_received(req);
+               return 0;
+       }
+
+       printf("tevent_threadpool_recv: %d failed\n", state->job_id);
+
+       switch (req_state) {
+       case TEVENT_REQ_NO_MEMORY:
+               *perrno = ENOMEM;
+               break;
+       case TEVENT_REQ_TIMED_OUT:
+               *perrno = ETIMEDOUT;
+               break;
+       case TEVENT_REQ_USER_ERROR:
+               *perrno = (int)error;
+               break;
+       default:
+               *perrno = EIO;
+               break;
+       }
+
+       tevent_req_received(req);
+       return -1;
+}
+
+#else
+/* !HAVE_PTHREAD */
+struct tevent_thread_proxy *tevent_thread_proxy_create(
+               struct tevent_context *dest_ev_ctx)
+{
+       errno = ENOSYS;
+       return NULL;
+}
+
+void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
+                                 struct tevent_immediate **pp_im,
+                                 tevent_immediate_handler_t handler,
+                                 void *pp_private_data)
+{
+       ;
+}
+
+struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+                                                  struct tevent_context *ev,
+                                                  int max_threads)
+{
+       return NULL;
+}
+
+struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
+                                         void (*fn)(void *private_data),
+                                         void *private_data)
+{
+       return NULL;
+}
+
+bool tevent_threadpool_recv(struct tevent_req *req)
+{
+       return false;
+}
+#endif
+
+static int tevent_threaded_context_destructor(
+       struct tevent_threaded_context *tctx)
+{
+       struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
+       int ret;
+
+       if (main_ev != NULL) {
+               DLIST_REMOVE(main_ev->threaded_contexts, tctx);
+       }
+
+       /*
+        * We have to coordinate with _tevent_threaded_schedule_immediate's
+        * unlock of the event_ctx_mutex. We're in the main thread here,
+        * and we can be scheduled before the helper thread finalizes its
+        * call _tevent_threaded_schedule_immediate. This means we would
+        * pthreadpool_destroy a locked mutex, which is illegal.
+        */
+       ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       return 0;
+}
+
+struct tevent_threaded_context *tevent_threaded_context_create(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+       struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
+       struct tevent_threaded_context *tctx;
+       int ret;
+
+       ret = tevent_common_wakeup_init(main_ev);
+       if (ret != 0) {
+               errno = ret;
+               return NULL;
+       }
+
+       tctx = talloc(mem_ctx, struct tevent_threaded_context);
+       if (tctx == NULL) {
+               return NULL;
+       }
+       tctx->event_ctx = ev;
+
+       ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
+       if (ret != 0) {
+               TALLOC_FREE(tctx);
+               return NULL;
+       }
+
+       DLIST_ADD(main_ev->threaded_contexts, tctx);
+       talloc_set_destructor(tctx, tevent_threaded_context_destructor);
+
+       return tctx;
+#else
+       errno = ENOSYS;
+       return NULL;
+#endif
+}
+
+static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
+{
+       if (im->event_ctx != NULL) {
+               abort();
+       }
+       return 0;
+}
+
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+                                        struct tevent_immediate *im,
+                                        tevent_immediate_handler_t handler,
+                                        void *private_data,
+                                        const char *handler_name,
+                                        const char *location)
+{
+#ifdef HAVE_PTHREAD
+       const char *create_location = im->create_location;
+       struct tevent_context *main_ev = NULL;
+       int ret, wakeup_fd;
+
+       ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       if (tctx->event_ctx == NULL) {
+               /*
+                * Our event context is already gone.
+                */
+               ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+               if (ret != 0) {
+                       abort();
+               }
+               return;
+       }
+
+       if ((im->event_ctx != NULL) || (handler == NULL)) {
+               abort();
+       }
+       if (im->destroyed) {
+               abort();
+       }
+       if (im->busy) {
+               abort();
+       }
+
+       main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
+
+       *im = (struct tevent_immediate) {
+               .event_ctx              = tctx->event_ctx,
+               .handler                = handler,
+               .private_data           = private_data,
+               .handler_name           = handler_name,
+               .create_location        = create_location,
+               .schedule_location      = location,
+       };
+
+       talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
+
+       ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       DLIST_ADD_END(main_ev->scheduled_immediates, im);
+       wakeup_fd = main_ev->wakeup_fd;
+
+       ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       /*
+        * We might want to wake up the main thread under the lock. We
+        * had a slightly similar situation in pthreadpool, changed
+        * with 1c4284c7395f23. This is not exactly the same, as the
+        * wakeup is only a last-resort thing in case the main thread
+        * is sleeping. Doing the wakeup under the lock can easily
+        * lead to a contended mutex, which is much more expensive
+        * than a noncontended one. So I'd opt for the lower footprint
+        * initially. Maybe we have to change that later.
+        */
+       tevent_common_wakeup_fd(wakeup_fd);
+#else
+       /*
+        * tevent_threaded_context_create() returned NULL with ENOSYS...
+        */
+       abort();
+#endif
+}
+
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+       int ret;
+       ret = pthread_mutex_lock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       while (ev->scheduled_immediates != NULL) {
+               struct tevent_immediate *im = ev->scheduled_immediates;
+               struct tevent_immediate copy = *im;
+
+               DLIST_REMOVE(ev->scheduled_immediates, im);
+
+               tevent_debug(ev, TEVENT_DEBUG_TRACE,
+                            "Schedule immediate event \"%s\": %p from thread into main\n",
+                            im->handler_name, im);
+               im->handler_name = NULL;
+               _tevent_schedule_immediate(im,
+                                          ev,
+                                          copy.handler,
+                                          copy.private_data,
+                                          copy.handler_name,
+                                          copy.schedule_location);
+       }
+
+       ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+#else
+       /*
+        * tevent_threaded_context_create() returned NULL with ENOSYS...
+        */
+       abort();
+#endif
 }
-#endif