more
authorStefan Metzmacher <metze@samba.org>
Thu, 28 Jul 2016 16:57:58 +0000 (18:57 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:51 +0000 (09:51 +0200)
lib/tevent/tevent.h
lib/tevent/tevent_metze.c [new file with mode: 0644]
lib/tevent/tevent_threads.c
lib/tevent/wscript

index 2825ec79b569ae07b4b31e8f71cc0e4dc8b99760..996991a5d20c2dda54c09d0857187efae1537632 100644 (file)
@@ -2063,10 +2063,10 @@ static const struct tevent_threadpool_job_description *__name##_description(void
        return &description.generic; \
 }
 
-struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
-                                                            const struct tevent_threadpool_job_description *desc,
-                                                            void *pstate,
-                                                            const char *location);
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
+                                                           const struct tevent_threadpool_job_description *desc,
+                                                           void *pstate,
+                                                           const char *location);
 
 #define tevent_threadpool_job_create(mem_ctx, name, pstate) \
        _tevent_threadpool_job_create((mem_ctx), __name##_description(), \
diff --git a/lib/tevent/tevent_metze.c b/lib/tevent/tevent_metze.c
new file mode 100644 (file)
index 0000000..811036d
--- /dev/null
@@ -0,0 +1,453 @@
+/*
+   tevent event library.
+
+   Copyright (C) Jeremy Allison 2015
+
+     ** NOTE! The following LGPL license applies to the tevent
+     ** library. This does NOT imply that all of Samba is released
+     ** under the LGPL
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 3 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, see <http://www.gnu.org/licenses/>.
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "talloc.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+#include "lib/util/dlinklist.h"
+
+#include <pthread.h>
+
+struct tevent_threadpool;
+
+struct tevent_threadpool_job {
+       struct tevent_threadpool_job *prev, *next;
+       struct tevent_threadpool *pool;
+
+       struct {
+               struct tevent_immediate *im;
+               bool busy;
+               bool done;
+               int ret;
+               //bool cancel;
+       } internal;
+
+       struct {
+               const struct tevent_threadpool_job_description *desc;
+               void *arg;
+       } state;
+};
+
+struct tevent_threadpool {
+       struct tevent_threadpool *prev, *next;
+       struct tevent_context *ev;
+
+       size_t max_threads;
+       struct tevent_threadpool_job *jobs;
+       struct tevent_queue *job_queue;
+
+};
+
+static void tevent_threadpool_handler(struct tevent_context *ev,
+                                     struct tevent_fd *fde,
+                                     uint16_t flags, void *private_data);
+
+static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
+{
+       tevent_queue_stop(pool->job_queue);
+       
+       //TODO DLIST_REMOVE all pool->jobs
+
+#if 0
+       printf("tevent_threadpool_destructor\n");
+       while (talloc_array_length(pool->pending) != 0) {
+               /* No TALLOC_FREE here */
+               talloc_free(pool->pending[0]);
+       }
+
+       while (pool->num_orphaned != 0) {
+               /*
+                * We've got jobs in the queue for which the tevent_req has
+                * been finished already. Wait for all of them to finish.
+                */
+               tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
+       }
+
+       threadpool_destroy(pool->threadpool);
+       pool->threadpool = NULL;
+#endif
+       return 0;
+}
+
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+                                                   struct tevent_context *ev,
+                                                   int max_threads,
+                                                   const char *location)
+{
+       struct tevent_threadpool *pool;
+
+       if (ev->wrapper.glue != NULL) {
+               /*
+                * stacking of wrappers is not supported
+                */
+               tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
+                            "%s: %s() stacking not allowed",
+                            __func__, location);
+               errno = ENOSYS;
+               return NULL;
+       }
+
+       pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+       if (pool == NULL) {
+               return NULL;
+       }
+       *pool = (struct tevent_threadpool) {
+               .ev = ev,
+               .max_threads = max_threads,
+       };
+
+       pool->job_queue = tevent_queue_create(pool, "job_queue");
+       if (pool->job_queue == NULL) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       DLIST_ADD_END(ev->threads.pools, pool);
+
+       return pool;
+}
+#if 0
+static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
+{
+       int num_pending = talloc_array_length(pool->pending);
+       int result;
+
+       while (true) {
+               int i;
+
+               result = pool->next_job_id++;
+               if (result == 0) {
+                       continue;
+               }
+
+               for (i = 0; i < num_pending; i++) {
+                       struct tevent_threadpool_state *state = tevent_req_data(
+                               pool->pending[i], struct tevent_threadpool_state);
+
+                       if (result == state->job_id) {
+                               break;
+                       }
+               }
+               if (i == num_pending) {
+                       return result;
+               }
+       }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+                                         enum tevent_req_state req_state);
+
+static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
+                                             struct tevent_threadpool *pool,
+                                             struct tevent_context *ev)
+{
+       struct tevent_req **pending;
+       int num_pending, orphaned_array_length;
+
+       num_pending = talloc_array_length(pool->pending);
+       printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
+               num_pending);
+       pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
+                                num_pending + 1);
+       if (pending == NULL) {
+               return false;
+       }
+       pending[num_pending] = req;
+       num_pending++;
+       pool->pending = pending;
+       tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
+
+       /*
+        * Make sure that the orphaned array of
+        * tevent_threadpool_state structs has enough space. A job can
+        * change from pending to orphaned in
+        * tevent_threadpool_cleanup, and to fail in a talloc
+        * destructor should be avoided if possible.
+        */
+
+       orphaned_array_length = talloc_array_length(pool->orphaned);
+       if (num_pending > orphaned_array_length) {
+               struct tevent_threadpool_state **orphaned;
+
+               orphaned = talloc_realloc(pool, pool->orphaned,
+                                         struct tevent_threadpool_state *,
+                                         orphaned_array_length + 1);
+               if (orphaned == NULL) {
+                       tevent_threadpool_req_unset_pending(req);
+                       return false;
+               }
+               pool->orphaned = orphaned;
+       }
+
+       if (pool->fde != NULL) {
+               return true;
+       }
+
+       pool->fde = tevent_add_fd(pool->ev,
+                                 pool,
+                                 pool->sig_fd, TEVENT_FD_READ,
+                                 tevent_threadpool_handler, pool);
+       if (pool->fde == NULL) {
+               tevent_threadpool_req_unset_pending(req);
+               return false;
+       }
+       return true;
+}
+
+static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
+{
+       struct tevent_threadpool *pool = state->pool;
+       int num_pending = talloc_array_length(pool->pending);
+
+       if (num_pending == 0 && pool->num_orphaned == 0) {
+               printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+               TALLOC_FREE(pool->fde);
+       }
+}
+
+static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
+{
+       struct tevent_threadpool_state *state = tevent_req_data(
+               req, struct tevent_threadpool_state);
+       struct tevent_threadpool *pool = state->pool;
+       int num_pending = talloc_array_length(pool->pending);
+       int i;
+
+       tevent_req_set_cleanup_fn(req, NULL);
+
+       if (num_pending == 1) {
+               if (pool->num_orphaned == 0) {
+                       printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
+                       TALLOC_FREE(pool->fde);
+               }
+               TALLOC_FREE(pool->pending);
+               return;
+       }
+
+       for (i = 0; i < num_pending; i++) {
+               if (req == pool->pending[i]) {
+                       break;
+               }
+       }
+       if (i == num_pending) {
+               return;
+       }
+       if (num_pending > 1) {
+               pool->pending[i] = pool->pending[num_pending - 1];
+       }
+       pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
+                                     num_pending - 1);
+}
+#endif
+static void tevent_threadpool_req_cleanup(struct tevent_req *req,
+                                         enum tevent_req_state req_state)
+{
+#if 0
+       struct tevent_threadpool_state *state = tevent_req_data(
+               req, struct tevent_threadpool_state);
+       //struct tevent_threadpool *pool = state->pool;
+
+       //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+
+       switch (req_state) {
+       case TEVENT_REQ_RECEIVED:
+               break;
+       default:
+       //      printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+               return;
+       }
+
+       if (state->done) {
+       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+               tevent_threadpool_req_unset_pending(req);
+               return;
+       }
+
+       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+       threadpool_job_cancel(state->job);
+
+       /*
+        * Keep around the state of the deleted request until the
+        * request has finished in the helper
+        * thread. tevent_threadpool_handler will destroy it.
+        */
+       pool->num_orphaned++;
+       tevent_threadpool_req_unset_pending(req);
+       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+                                                            &req->data);
+#endif
+}
+
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
+{
+       if (job->pool == NULL) {
+               return 0;
+       }
+
+       //DLIST_REMOVE(job->pool, job);
+       job->pool = NULL;
+       return 0;
+}
+
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
+                                                           const struct tevent_threadpool_job_description *desc,
+                                                           void *pstate,
+                                                           const char *location)
+{
+       struct tevent_threadpool_job *job;
+       void **ppstate = (void **)pstate;
+       void *state;
+       size_t payload;
+
+       payload = sizeof(struct tevent_immediate) + desc->type_size;
+       if (payload < sizeof(struct tevent_immediate)) {
+               /* overflow */
+               return NULL;
+       }
+
+       job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
+                                  2, payload);
+       if (job == NULL) {
+               return NULL;
+       }
+
+       *job = (struct tevent_threadpool_job) {
+               .internal = {
+                       .im = tevent_create_immediate(job),
+               },
+               .state = {
+                       .desc = desc,
+                       .arg = talloc_zero_size(job, desc->type_size),
+               },
+       };
+       talloc_set_name_const(job->state.arg, desc->type);
+
+       talloc_set_destructor(job, tevent_threadpool_job_destructor);
+
+       *ppstate = state;
+       return job;
+}
+
+struct metze_thread_args {
+       uint8_t tmp;
+};
+
+static int metze_thread_fn(struct metze_thread_args *args)
+{
+       return 0;
+}
+
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
+
+struct tevent_threadpool_job_state {
+       struct tevent_threadpool_job *job;
+};
+
+static void tevent_threadpool_job_cleanup(struct tevent_req *req,
+                                         enum tevent_req_state req_state)
+{
+       struct tevent_threadpool_job_state *state =
+               tevent_req_data(req,
+               struct tevent_threadpool_job_state);
+
+       switch (req_state) {
+       case TEVENT_REQ_RECEIVED:
+               break;
+       default:
+               return;
+       }
+
+
+       if (state->done) {
+       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+               tevent_threadpool_req_unset_pending(req);
+               return;
+       }
+
+       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+       threadpool_job_cancel(state->job);
+
+       /*
+        * Keep around the state of the deleted request until the
+        * request has finished in the helper
+        * thread. tevent_threadpool_handler will destroy it.
+        */
+       pool->num_orphaned++;
+       tevent_threadpool_req_unset_pending(req);
+       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
+                                                            &req->data);
+#endif
+}
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev,
+                                             struct tevent_threadpool *pool,
+                                             struct tevent_threadpool_job *job)
+{
+       struct tevent_req *req;
+       struct tevent_threadpool_job_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct tevent_threadpool_job_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->job = job;
+
+       return req;
+}
+
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
+{
+       struct tevent_threadpool_job_state *state =
+               tevent_req_data(req,
+               struct tevent_threadpool_job_state);
+       enum tevent_req_state req_state;
+       uint64_t error;
+
+       if (!tevent_req_is_error(req, &req_state, &error)) {
+               tevent_req_received(req);
+               return 0;
+       }
+
+       switch (req_state) {
+       case TEVENT_REQ_NO_MEMORY:
+               *perrno = ENOMEM;
+               break;
+       case TEVENT_REQ_TIMED_OUT:
+               *perrno = ETIMEDOUT;
+               break;
+       case TEVENT_REQ_USER_ERROR:
+               *perrno = (int)error;
+               break;
+       default:
+               *perrno = EIO;
+               break;
+       }
+
+       tevent_req_received(req);
+       return -1;
+}
index 7cc57c1e7d8d90178f0eed9bfcd1a130015b4c5f..83c1de0e9c9e53f3a07eb0b9a620bcb553ae6945 100644 (file)
@@ -368,1127 +368,6 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
        }
 }
 
-/*********************************************************************
- * Raw threadpool implementation
- *********************************************************************/
-
-struct threadpool_job {
-       /* List of jobs */
-       struct threadpool_job *prev, *next;
-
-       int id;
-       void (*fn)(void *private_data);
-       void *private_data;
-       bool cancel;
-};
-
-struct threadpool {
-       /*
-        * List threadpools for fork safety
-        */
-       struct threadpool *prev, *next;
-
-       /*
-        * Control access to this struct
-        */
-       pthread_mutex_t mutex;
-
-       /*
-        * Threads waiting for work do so here
-        */
-       pthread_cond_t condvar;
-
-       /*
-        * List of jobs
-        */
-       struct threadpool_job *jobs;
-
-       /*
-        * pipe for signalling
-        */
-       int sig_pipe[2];
-
-       /*
-        * indicator to worker threads that they should shut down
-        */
-       int shutdown;
-
-       /*
-        * maximum number of threads
-        */
-       int max_threads;
-
-       /*
-        * Number of threads
-        */
-       int num_threads;
-
-       /*
-        * Number of idle threads
-        */
-       int num_idle;
-
-       /*
-        * An array of threads that require joining.
-        */
-       int                     num_exited;
-       pthread_t               *exited; /* We alloc more */
-};
-
-static pthread_mutex_t threadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
-static struct threadpool *threadpools = NULL;
-static pthread_once_t threadpool_atfork_initialized = PTHREAD_ONCE_INIT;
-
-static void threadpool_prep_atfork(void);
-
-#if 0
-/*
- * Initialize a thread pool
- */
-
-static struct threadpool *threadpool_init(TALLOC_CTX *mem_ctx,
-                                         unsigned max_threads)
-{
-       struct threadpool *pool;
-       int ret;
-
-       pool = talloc_zero(mem_ctx, struct threadpool);
-       if (pool == NULL) {
-               return NULL;
-       }
-
-       ret = pipe(pool->sig_pipe);
-       if (ret == -1) {
-               TALLOC_FREE(pool);
-               return NULL;
-       }
-
-       ret = pthread_mutex_init(&pool->mutex, NULL);
-       if (ret != 0) {
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
-               TALLOC_FREE(pool);
-               return NULL;
-       }
-
-       ret = pthread_cond_init(&pool->condvar, NULL);
-       if (ret != 0) {
-               pthread_mutex_destroy(&pool->mutex);
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
-               TALLOC_FREE(pool);
-               return NULL;
-       }
-
-       pool->shutdown = 0;
-       pool->num_threads = 0;
-       pool->num_exited = 0;
-       pool->exited = NULL;
-       pool->max_threads = max_threads;
-       pool->num_idle = 0;
-
-       ret = pthread_mutex_lock(&threadpools_mutex);
-       if (ret != 0) {
-               pthread_cond_destroy(&pool->condvar);
-               pthread_mutex_destroy(&pool->mutex);
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
-               TALLOC_FREE(pool);
-               return NULL;
-       }
-       DLIST_ADD(threadpools, pool);
-
-       ret = pthread_mutex_unlock(&threadpools_mutex);
-       assert(ret == 0);
-
-       pthread_once(&threadpool_atfork_initialized, threadpool_prep_atfork);
-
-       return pool;
-}
-
-static void threadpool_prepare(void)
-{
-       int ret;
-       struct threadpool *pool;
-
-       ret = pthread_mutex_lock(&threadpools_mutex);
-       assert(ret == 0);
-
-       pool = threadpools;
-
-       while (pool != NULL) {
-               ret = pthread_mutex_lock(&pool->mutex);
-               assert(ret == 0);
-               pool = pool->next;
-       }
-}
-
-static void threadpool_parent(void)
-{
-       int ret;
-       struct threadpool *pool;
-
-       for (pool = DLIST_TAIL(threadpools);
-            pool != NULL;
-            pool = DLIST_PREV(pool)) {
-               ret = pthread_mutex_unlock(&pool->mutex);
-               assert(ret == 0);
-       }
-
-       ret = pthread_mutex_unlock(&threadpools_mutex);
-       assert(ret == 0);
-}
-
-static void threadpool_child(void)
-{
-       int ret;
-       struct threadpool *pool;
-
-       for (pool = DLIST_TAIL(threadpools);
-            pool != NULL;
-            pool = DLIST_PREV(pool)) {
-
-               close(pool->sig_pipe[0]);
-               close(pool->sig_pipe[1]);
-
-               ret = pipe(pool->sig_pipe);
-               assert(ret == 0);
-
-               pool->num_threads = 0;
-
-               pool->num_exited = 0;
-               free(pool->exited);
-               pool->exited = NULL;
-
-               pool->num_idle = 0;
-
-               ret = pthread_mutex_unlock(&pool->mutex);
-               assert(ret == 0);
-       }
-
-       ret = pthread_mutex_unlock(&threadpools_mutex);
-       assert(ret == 0);
-}
-
-static void threadpool_prep_atfork(void)
-{
-       pthread_atfork(threadpool_prepare, threadpool_parent,
-                      threadpool_child);
-}
-
-/*
- * Return the file descriptor which becomes readable when a job has
- * finished
- */
-
-static int threadpool_signal_fd(struct threadpool *pool)
-{
-       return pool->sig_pipe[0];
-}
-
-static void threadpool_job_cancel(struct threadpool_job *job)
-{
-       job->cancel = true;
-}
-static bool threadpool_job_is_cancelled(struct threadpool_job *job)
-{
-       return job->cancel;
-}
-
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void threadpool_join_children(struct threadpool *pool)
-{
-       int i;
-
-       for (i=0; i<pool->num_exited; i++) {
-               pthread_join(pool->exited[i], NULL);
-       }
-       pool->num_exited = 0;
-
-       /*
-        * Deliberately not free and NULL pool->exited. That will be
-        * re-used by realloc later.
-        */
-}
-
-/*
- * Fetch a finished job number from the signal pipe
- */
-
-static int threadpool_finished_jobs(struct threadpool *pool, int *jobids,
-                                   unsigned num_jobids)
-{
-       ssize_t to_read, nread;
-
-       nread = -1;
-       errno = EINTR;
-
-       to_read = sizeof(int) * num_jobids;
-
-       while ((nread == -1) && (errno == EINTR)) {
-               nread = read(pool->sig_pipe[0], jobids, to_read);
-       }
-       if (nread == -1) {
-               return -errno;
-       }
-       if ((nread % sizeof(int)) != 0) {
-               return -EINVAL;
-       }
-       return nread / sizeof(int);
-}
-
-/*
- * Destroy a thread pool, finishing all threads working for it
- */
-
-static int threadpool_destroy(struct threadpool *pool)
-{
-       int ret, ret1;
-
-       ret = pthread_mutex_lock(&pool->mutex);
-       if (ret != 0) {
-               return ret;
-       }
-
-       if ((pool->jobs != NULL) || pool->shutdown) {
-               ret = pthread_mutex_unlock(&pool->mutex);
-               assert(ret == 0);
-               return EBUSY;
-       }
-
-       if (pool->num_threads > 0) {
-               /*
-                * We have active threads, tell them to finish, wait for that.
-                */
-
-               pool->shutdown = 1;
-
-               if (pool->num_idle > 0) {
-                       /*
-                        * Wake the idle threads. They will find
-                        * pool->shutdown to be set and exit themselves
-                        */
-                       ret = pthread_cond_broadcast(&pool->condvar);
-                       if (ret != 0) {
-                               pthread_mutex_unlock(&pool->mutex);
-                               return ret;
-                       }
-               }
-
-               while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
-                       if (pool->num_exited > 0) {
-                               threadpool_join_children(pool);
-                               continue;
-                       }
-                       /*
-                        * A thread that shuts down will also signal
-                        * pool->condvar
-                        */
-                       ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
-                       if (ret != 0) {
-                               pthread_mutex_unlock(&pool->mutex);
-                               return ret;
-                       }
-               }
-       }
-
-       ret = pthread_mutex_unlock(&pool->mutex);
-       if (ret != 0) {
-               return ret;
-       }
-       ret = pthread_mutex_destroy(&pool->mutex);
-       ret1 = pthread_cond_destroy(&pool->condvar);
-
-       if (ret != 0) {
-               return ret;
-       }
-       if (ret1 != 0) {
-               return ret1;
-       }
-
-       ret = pthread_mutex_lock(&threadpools_mutex);
-       if (ret != 0) {
-               return ret;
-       }
-       DLIST_REMOVE(threadpools, pool);
-       ret = pthread_mutex_unlock(&threadpools_mutex);
-       assert(ret == 0);
-
-       close(pool->sig_pipe[0]);
-       pool->sig_pipe[0] = -1;
-
-       close(pool->sig_pipe[1]);
-       pool->sig_pipe[1] = -1;
-
-       free(pool->exited);
-       TALLOC_FREE(pool);
-
-       return 0;
-}
-
-/*
- * Prepare for pthread_exit(), pool->mutex must be locked
- */
-static void threadpool_server_exit(struct threadpool *pool)
-{
-       pthread_t *exited;
-
-       pool->num_threads--;
-
-       exited = (pthread_t *)realloc(
-               pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
-
-       if (exited == NULL) {
-               /* lost a thread status */
-               return;
-       }
-       pool->exited = exited;
-
-       pool->exited[pool->num_exited] = pthread_self();
-       pool->num_exited++;
-}
-
-static struct threadpool_job *threadpool_get_job(struct threadpool *p)
-{
-       struct threadpool_job *job = p->jobs;
-
-       if (job) {
-               DLIST_REMOVE(p->jobs, job);
-       }
-       return job;
-}
-
-static struct threadpool_job *threadpool_put_job(TALLOC_CTX *mem_ctx,
-                                                struct threadpool *p,
-                                                int id,
-                                                void (*fn)(void *private_data),
-                                                void *private_data)
-{
-       struct threadpool_job *job;
-
-       job = talloc_zero(mem_ctx, struct threadpool_job);
-       if (job == NULL) {
-               return NULL;
-       }
-
-       job->id = id;
-       job->fn = fn;
-       job->private_data = private_data;
-       DLIST_ADD_END(p->jobs, job, struct threadpool_job *);
-
-       return job;
-}
-
-static void *threadpool_server(void *arg)
-{
-       struct threadpool *pool = (struct threadpool *)arg;
-       int res;
-
-       res = pthread_mutex_lock(&pool->mutex);
-       if (res != 0) {
-               return NULL;
-       }
-
-       while (1) {
-               struct timespec ts;
-               struct threadpool_job *job;
-
-               /*
-                * idle-wait at most 1 second. If nothing happens in that
-                * time, exit this thread.
-                */
-
-               clock_gettime(CLOCK_MONOTONIC, &ts);
-               ts.tv_sec += 1;
-
-               while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
-
-                       pool->num_idle++;
-                       res = pthread_cond_timedwait(
-                               &pool->condvar, &pool->mutex, &ts);
-                       pool->num_idle--;
-
-                       if (res == ETIMEDOUT) {
-                               if (pool->jobs == NULL) {
-                                       /*
-                                        * we timed out and still no work for
-                                        * us. Exit.
-                                        */
-                                       threadpool_server_exit(pool);
-                                       pthread_mutex_unlock(&pool->mutex);
-                                       return NULL;
-                               }
-
-                               break;
-                       }
-                       assert(res == 0);
-               }
-
-               job = threadpool_get_job(pool);
-               if (job != NULL) {
-                       ssize_t written;
-                       int sig_pipe = pool->sig_pipe[1];
-
-                       printf("threadpool_server: pulled job %d\n", job->id);
-                       /*
-                        * Do the work with the mutex unlocked
-                        */
-
-                       res = pthread_mutex_unlock(&pool->mutex);
-                       assert(res == 0);
-
-                       if (likely(!job->cancel)) {
-                               printf("threadpool_server: executing job %d\n", job->id);
-                               job->fn(job->private_data);
-                       } else {
-                               printf("threadpool_server: skipping cancelled job %d\n", job->id);
-                       }
-
-                       /*
-                        * Do the write without the lock, otherwise we
-                        * can deadlock with the scheduler
-                        */
-                       written = write(sig_pipe, &job->id, sizeof(job->id));
-
-                       res = pthread_mutex_lock(&pool->mutex);
-                       assert(res == 0);
-
-                       if (written != sizeof(int)) {
-                               threadpool_server_exit(pool);
-                               pthread_mutex_unlock(&pool->mutex);
-                               return NULL;
-                       }
-               }
-
-               if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
-                       /*
-                        * No more work to do and we're asked to shut down, so
-                        * exit
-                        */
-                       threadpool_server_exit(pool);
-
-                       if (pool->num_threads == 0) {
-                               /*
-                                * Ping the main thread waiting for all of us
-                                * workers to have quit.
-                                */
-                               pthread_cond_broadcast(&pool->condvar);
-                       }
-
-                       pthread_mutex_unlock(&pool->mutex);
-                       return NULL;
-               }
-       }
-}
-
-static struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
-                                                struct threadpool *pool,
-                                                int job_id,
-                                                void (*fn)(void *private_data),
-                                                void *private_data)
-{
-       pthread_t thread_id;
-       int res;
-       sigset_t mask, omask;
-       struct threadpool_job *job;
-
-       res = pthread_mutex_lock(&pool->mutex);
-       assert(res == 0);
-
-       if (pool->shutdown) {
-               /*
-                * Protect against the pool being shut down while
-                * trying to add a job
-                */
-               printf("threadpool_add_job: pool is shutting down\n");
-               res = pthread_mutex_unlock(&pool->mutex);
-               assert(res == 0);
-               return NULL;
-       }
-
-       /*
-        * Just some cleanup under the mutex
-        */
-       threadpool_join_children(pool);
-
-       /*
-        * Add job to the end of the queue
-        */
-       job = threadpool_put_job(mem_ctx, pool, job_id, fn, private_data);
-       if (job == NULL) {
-               printf("threadpool_add_job: threadpool_put_job failed\n");
-               pthread_mutex_unlock(&pool->mutex);
-               errno = ENOMEM;
-               return NULL;
-       }
-
-       if (pool->num_idle > 0) {
-               /*
-                * We have idle threads, wake one.
-                */
-               res = pthread_cond_signal(&pool->condvar);
-               pthread_mutex_unlock(&pool->mutex);
-               return job;
-       }
-
-       if ((pool->max_threads != 0) &&
-           (pool->num_threads >= pool->max_threads)) {
-               /*
-                * No more new threads, we just queue the request
-                */
-               pthread_mutex_unlock(&pool->mutex);
-               return job;
-       }
-
-       /*
-        * Create a new worker thread. It should not receive any signals.
-        */
-
-       sigfillset(&mask);
-
-        res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
-       assert(res == 0);
-
-       res = pthread_create(&thread_id, NULL, threadpool_server,
-                            (void *)pool);
-       if (res == 0) {
-               pool->num_threads += 1;
-       }
-
-        res = pthread_sigmask(SIG_SETMASK, &omask, NULL);
-       assert(res == 0);
-
-       pthread_mutex_unlock(&pool->mutex);
-       return job;
-}
-
-#endif
-/*******************************************************************
- * tvent_threadpool_send()/recv() sugar
- *******************************************************************/
-
-struct tevent_threadpool_job {
-       struct tevent_threadpool_job *prev, *next;
-       struct tevent_threadpool *pool;
-
-       struct {
-               struct tevent_immediate *im;
-               bool busy;
-               bool done;
-               int ret;
-               //bool cancel;
-       } internal;
-
-       struct {
-               const struct tevent_threadpool_job_description *desc;
-               void *arg;
-       } state;
-};
-
-struct tevent_threadpool {
-       struct tevent_threadpool *prev, *next;
-       struct tevent_context *ev;
-
-       size_t max_threads;
-       struct tevent_threadpool_job *jobs;
-       struct tevent_queue *job_queue;
-
-};
-
-static void tevent_threadpool_handler(struct tevent_context *ev,
-                                     struct tevent_fd *fde,
-                                     uint16_t flags, void *private_data);
-
-static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
-{
-       tevent_queue_stop(pool->job_queue);
-
-       //TODO DLIST_REMOVE all pool->jobs
-
-#if 0
-       printf("tevent_threadpool_destructor\n");
-       while (talloc_array_length(pool->pending) != 0) {
-               /* No TALLOC_FREE here */
-               talloc_free(pool->pending[0]);
-       }
-
-       while (pool->num_orphaned != 0) {
-               /*
-                * We've got jobs in the queue for which the tevent_req has
-                * been finished already. Wait for all of them to finish.
-                */
-               tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
-       }
-
-       threadpool_destroy(pool->threadpool);
-       pool->threadpool = NULL;
-#endif
-       return 0;
-}
-
-struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
-                                                   struct tevent_context *ev,
-                                                   int max_threads,
-                                                   const char *location)
-{
-       struct tevent_threadpool *pool;
-
-       if (ev->wrapper.glue != NULL) {
-               /*
-                * stacking of wrappers is not supported
-                */
-               tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
-                            "%s: %s() stacking not allowed",
-                            __func__, location);
-               errno = ENOSYS;
-               return NULL;
-       }
-
-       pool = talloc_zero(mem_ctx, struct tevent_threadpool);
-       if (pool == NULL) {
-               return NULL;
-       }
-       *pool = (struct tevent_threadpool) {
-               .ev = ev,
-               .max_threads = max_threads,
-       };
-
-       pool->job_queue = tevent_queue_create(pool, "job_queue");
-       if (pool->job_queue == NULL) {
-               TALLOC_FREE(pool);
-               return NULL;
-       }
-
-       DLIST_ADD_END(ev->threads.pools, pool);
-
-       return pool;
-}
-#if 0
-static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
-{
-       int num_pending = talloc_array_length(pool->pending);
-       int result;
-
-       while (true) {
-               int i;
-
-               result = pool->next_job_id++;
-               if (result == 0) {
-                       continue;
-               }
-
-               for (i = 0; i < num_pending; i++) {
-                       struct tevent_threadpool_state *state = tevent_req_data(
-                               pool->pending[i], struct tevent_threadpool_state);
-
-                       if (result == state->job_id) {
-                               break;
-                       }
-               }
-               if (i == num_pending) {
-                       return result;
-               }
-       }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
-                                         enum tevent_req_state req_state);
-
-static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
-                                             struct tevent_threadpool *pool,
-                                             struct tevent_context *ev)
-{
-       struct tevent_req **pending;
-       int num_pending, orphaned_array_length;
-
-       num_pending = talloc_array_length(pool->pending);
-       printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
-               num_pending);
-       pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
-                                num_pending + 1);
-       if (pending == NULL) {
-               return false;
-       }
-       pending[num_pending] = req;
-       num_pending++;
-       pool->pending = pending;
-       tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
-
-       /*
-        * Make sure that the orphaned array of
-        * tevent_threadpool_state structs has enough space. A job can
-        * change from pending to orphaned in
-        * tevent_threadpool_cleanup, and to fail in a talloc
-        * destructor should be avoided if possible.
-        */
-
-       orphaned_array_length = talloc_array_length(pool->orphaned);
-       if (num_pending > orphaned_array_length) {
-               struct tevent_threadpool_state **orphaned;
-
-               orphaned = talloc_realloc(pool, pool->orphaned,
-                                         struct tevent_threadpool_state *,
-                                         orphaned_array_length + 1);
-               if (orphaned == NULL) {
-                       tevent_threadpool_req_unset_pending(req);
-                       return false;
-               }
-               pool->orphaned = orphaned;
-       }
-
-       if (pool->fde != NULL) {
-               return true;
-       }
-
-       pool->fde = tevent_add_fd(pool->ev,
-                                 pool,
-                                 pool->sig_fd, TEVENT_FD_READ,
-                                 tevent_threadpool_handler, pool);
-       if (pool->fde == NULL) {
-               tevent_threadpool_req_unset_pending(req);
-               return false;
-       }
-       return true;
-}
-
-static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
-{
-       struct tevent_threadpool *pool = state->pool;
-       int num_pending = talloc_array_length(pool->pending);
-
-       if (num_pending == 0 && pool->num_orphaned == 0) {
-               printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
-               TALLOC_FREE(pool->fde);
-       }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
-{
-       struct tevent_threadpool_state *state = tevent_req_data(
-               req, struct tevent_threadpool_state);
-       struct tevent_threadpool *pool = state->pool;
-       int num_pending = talloc_array_length(pool->pending);
-       int i;
-
-       tevent_req_set_cleanup_fn(req, NULL);
-
-       if (num_pending == 1) {
-               if (pool->num_orphaned == 0) {
-                       printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
-                       TALLOC_FREE(pool->fde);
-               }
-               TALLOC_FREE(pool->pending);
-               return;
-       }
-
-       for (i = 0; i < num_pending; i++) {
-               if (req == pool->pending[i]) {
-                       break;
-               }
-       }
-       if (i == num_pending) {
-               return;
-       }
-       if (num_pending > 1) {
-               pool->pending[i] = pool->pending[num_pending - 1];
-       }
-       pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
-                                     num_pending - 1);
-}
-#endif
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
-                                         enum tevent_req_state req_state)
-{
-       struct tevent_threadpool_state *state = tevent_req_data(
-               req, struct tevent_threadpool_state);
-       //struct tevent_threadpool *pool = state->pool;
-
-       //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
-
-       switch (req_state) {
-       case TEVENT_REQ_RECEIVED:
-               break;
-       default:
-       //      printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
-               return;
-       }
-
-       if (state->done) {
-       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
-               tevent_threadpool_req_unset_pending(req);
-               return;
-       }
-
-       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-#if 0
-       threadpool_job_cancel(state->job);
-
-       /*
-        * Keep around the state of the deleted request until the
-        * request has finished in the helper
-        * thread. tevent_threadpool_handler will destroy it.
-        */
-       pool->num_orphaned++;
-       tevent_threadpool_req_unset_pending(req);
-       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
-                                                            &req->data);
-#endif
-}
-
-static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
-{
-       if (state->pool == NULL) {
-               return 0;
-       }
-
-       DLIST_REMOVE(job->pool, job);
-       job->pool = NULL;
-       return 0;
-}
-
-struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
-                                                            const struct tevent_threadpool_job_description *desc,
-                                                            void *pstate,
-                                                            const char *location)
-{
-       struct tevent_threadpool_job *job;
-       void **ppstate = (void **)pstate;
-       void *state;
-       size_t payload;
-
-       payload = sizeof(struct tevent_immediate) + desc->type_size;
-       if (payload < sizeof(struct tevent_immediate)) {
-               /* overflow */
-               return NULL;
-       }
-
-       job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
-                                  2, payload);
-       if (job == NULL) {
-               return NULL;
-       }
-
-       *job = (struct tevent_threadpool_job) {
-               .internal = {
-                       .im = tevent_create_immediate(job),
-               },
-               .state = {
-                       .desc = desc,
-                       .arg = talloc_zero_size(job, desc->type_size),
-               },
-       };
-       talloc_set_name_const(job->state.arg, desc->type);
-
-       talloc_set_destructor(job, tevent_threadpool_job_destructor);
-
-       *ppstate = state;
-       return job;
-}
-
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                           int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
-                                                           void *pstate,
-                                                           size_t state_size,
-                                                           const char *type,
-                                                           const char *location)
-{
-       return NULL;
-}
-
-struct metze_thread_args {
-       uint8_t tmp;
-};
-
-static int metze_thread_fn(struct metze_thread_state *state)
-{
-       return 0;
-}
-
-TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
-
-struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
-                                             struct tevent_context *ev,
-                                             struct tevent_threadpool *pool,
-                                             struct tevent_threadpool_job *job)
-{
-       struct tevent_req *req;
-       struct tevent_threadpool_state *state;
-#if 0
-struct tevent_threadpool_job *j;
-struct metze_thread_state *jstate;
-
-       j = tevent_threadpool_job_create(mem_ctx, metze_thread_fn,
-                                    &jstate, struct metze_thread_state);
-       if (j == NULL) {
-
-       }
-#endif
-       req = tevent_req_create(mem_ctx, &state, struct tevent_threadpool_state);
-       if (req == NULL) {
-               return NULL;
-       }
-       state->pool = pool;
-       state->job_id = tevent_threadpool_next_job_id(state->pool);
-       state->done = false;
-       state->job = job;
-
-       printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
-
-
-       state->threadpool_job = threadpool_add_job(state,
-                                                  state->pool->threadpool,
-                                                  state->job_id,
-                                                  state->job->fn,
-                                                  state->job->state);
-       if (state->threadpool_job == NULL) {
-               printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
-               tevent_req_error(req, errno);
-               return tevent_req_post(req, pool->ev);
-       }
-       if (!tevent_threadpool_req_set_pending(req, pool, pool->ev)) {
-               printf("tevent_threadpool_send: job %d tevent_threadpool_req_set_pending failed\n", state->job_id);
-               tevent_req_oom(req);
-               return tevent_req_post(req, pool->ev);
-       }
-       return req;
-}
-
-static void tevent_threadpool_handler(struct tevent_context *ev,
-                                     struct tevent_fd *fde,
-                                     uint16_t flags,
-                                     void *private_data)
-{
-       struct tevent_threadpool *pool = talloc_get_type_abort(
-               private_data, struct tevent_threadpool);
-       struct tevent_threadpool_state *orphaned_state;
-       int i, num_pending;
-       int job_id;
-
-       if (threadpool_finished_jobs(pool->threadpool, &job_id, 1) < 0) {
-               return;
-       }
-
-       printf("tevent_threadpool_handler: %d finished (num orphaned: %d)\n",
-              job_id, pool->num_orphaned);
-
-       num_pending = talloc_array_length(pool->pending);
-
-       for (i=0; i<num_pending; i++) {
-               struct tevent_threadpool_state *state = tevent_req_data(
-                       pool->pending[i], struct tevent_threadpool_state);
-
-               printf("tevent_threadpool_handler: %d pending\n", state->job_id);
-               if (job_id == state->job_id) {
-                       state->done = true;
-                       if (threadpool_job_is_cancelled(state->job)) {
-                               printf("tevent_threadpool_handler: %d cancelled\n", job_id);
-                               tevent_req_error(pool->pending[i], EINTR);
-                               tevent_req_post(pool->pending[i], ev);
-                               return;
-                       }
-                       printf("tevent_threadpool_handler: %d done\n", job_id);
-                       tevent_req_done(pool->pending[i]);
-                       return;
-               }
-       }
-
-       printf("tevent_threadpool_handler: num orphaned: %d\n", pool->num_orphaned);
-
-       for (i = 0; i < pool->num_orphaned; i++) {
-               if (job_id == pool->orphaned[i]->job_id) {
-                       break;
-               }
-       }
-       if (i == pool->num_orphaned) {
-               printf("tevent_threadpool_handler: done\n");
-               return;
-       }
-
-       printf("tevent_threadpool_handler: freeing orphaned: %d\n", pool->num_orphaned);
-
-       orphaned_state = pool->orphaned[i];
-       if (i < pool->num_orphaned - 1) {
-               pool->orphaned[i] = pool->orphaned[pool->num_orphaned - 1];
-       }
-       pool->num_orphaned--;
-       tevent_threadpool_state_unset_orphaned(orphaned_state);
-       TALLOC_FREE(orphaned_state);
-}
-
-int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
-{
-       enum tevent_req_state req_state;
-       uint64_t error;
-       struct tevent_threadpool_state *state = tevent_req_data(
-               req, struct tevent_threadpool_state);
-
-       if (!tevent_req_is_error(req, &req_state, &error)) {
-               printf("tevent_threadpool_recv: %d done\n", state->job_id);
-               tevent_req_received(req);
-               return 0;
-       }
-
-       printf("tevent_threadpool_recv: %d failed\n", state->job_id);
-
-       switch (req_state) {
-       case TEVENT_REQ_NO_MEMORY:
-               *perrno = ENOMEM;
-               break;
-       case TEVENT_REQ_TIMED_OUT:
-               *perrno = ETIMEDOUT;
-               break;
-       case TEVENT_REQ_USER_ERROR:
-               *perrno = (int)error;
-               break;
-       default:
-               *perrno = EIO;
-               break;
-       }
-
-       tevent_req_received(req);
-       return -1;
-}
-
-#else
-/* !HAVE_PTHREAD */
-struct tevent_thread_proxy *tevent_thread_proxy_create(
-               struct tevent_context *dest_ev_ctx)
-{
-       errno = ENOSYS;
-       return NULL;
-}
-
-void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
-                                 struct tevent_immediate **pp_im,
-                                 tevent_immediate_handler_t handler,
-                                 void *pp_private_data)
-{
-       ;
-}
-
-struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
-                                                  struct tevent_context *ev,
-                                                  int max_threads)
-{
-       return NULL;
-}
-
-struct tevent_req *tevent_threadpool_send(struct tevent_threadpool *pool,
-                                         void (*fn)(void *private_data),
-                                         void *private_data)
-{
-       return NULL;
-}
-
-bool tevent_threadpool_recv(struct tevent_req *req)
-{
-       return false;
-}
 #endif
 
 static int tevent_threaded_context_destructor(
index b04988af7d6e1a3a923b62221f710d05aa275560..4eec8d5959802ee8acdecff0863b7691c8dd829a 100644 (file)
@@ -78,7 +78,7 @@ def build(bld):
 
     SRC = '''tevent.c tevent_debug.c tevent_fd.c tevent_immediate.c
              tevent_queue.c tevent_req.c tevent_wrapper.c
-             tevent_poll.c tevent_threads.c tevent_threadpool.c
+             tevent_poll.c tevent_threads.c tevent_threadpool.c tevent_metze.c
              tevent_signal.c tevent_standard.c tevent_timed.c tevent_util.c tevent_wakeup.c'''
 
     if bld.CONFIG_SET('HAVE_EPOLL'):