more...
authorStefan Metzmacher <metze@samba.org>
Thu, 28 Jul 2016 18:52:32 +0000 (20:52 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:51 +0000 (09:51 +0200)
lib/tevent/tevent.h
lib/tevent/tevent_metze.c

index 901cd0e96c90ddd63b112e01eee0b3f20d79fcf5..81e71e61ea9aecb68ff108bcf249fb02225741b1 100644 (file)
@@ -2017,23 +2017,26 @@ struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
 int tevent_threadpool_destroy(struct tevent_threadpool *pool);
 
 #ifdef DOXYGEN
+#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(fn)
 /**
  * @brief Create a job to run in a threadpool
  *
  * Only one instance of the job can run at a time,
  * but the job can be reused multiple times.
  *
-  If you want to run multiple instances of similar jobs
* If you want to run multiple instances of similar jobs
  * at the same time, you need to create a tevent_threadpool_job
  * for each instance.
  *
+ * You need to declare a job description using
+ * TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(fn) as a global.
+ *
  * @param[in] mem_ctx   The memory context for the result.
  * @param[in] fn        The function to execute the job (must be thread safe!)
  * @param[in] pstate    Pointer to the request state.
- * @param[in] type      The name of the job.
  */
 struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                          #name,
+                                                          #fn,
                                                           void **pstate);
 #else
 struct tevent_threadpool_job_description {
index 975b7c7a63edbf9bdd63443dfac0116311095ae7..b860fcf69af64755cf0f3cb25db9676fc564512c 100644 (file)
@@ -57,18 +57,34 @@ struct tevent_threadpool {
        struct tevent_threadpool *prev, *next;
        struct tevent_context *ev;
 
-       size_t max_threads;
-       struct tevent_threadpool_job *jobs;
-       struct tevent_queue *job_queue;
+       struct {
+               struct tevent_queue *queue;
+
+               pthread_mutex_t mutex;
+               pthread_cond_t condvar;
+
+               size_t max_threads;
+               size_t active_threads;
 
+               size_t num_jobs;
+               struct tevent_threadpool_job *list;
+       } jobs;
 };
 
 static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
 {
-       tevent_queue_stop(pool->job_queue);
+       int ret;
+
+       tevent_queue_stop(pool->jobs.queue);
        
        //TODO DLIST_REMOVE all pool->jobs
 
+       ret = pthread_mutex_unlock(&pool->jobs.mutex);
+       if (ret != 0) {
+               return -1;
+       }
+       pthread_mutex_destroy(&pool->jobs.mutex);
+       pthread_cond_destroy(&pool->jobs.condvar);
        return 0;
 }
 
@@ -78,6 +94,7 @@ struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
                                                    const char *location)
 {
        struct tevent_threadpool *pool;
+       int ret;
 
        if (ev->wrapper.glue != NULL) {
                /*
@@ -96,18 +113,33 @@ struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
        }
        *pool = (struct tevent_threadpool) {
                .ev = ev,
-               .max_threads = max_threads,
+               .jobs = {
+                       .max_threads = max_threads,
+               },
        };
+       talloc_set_destructor(pool, tevent_threadpool_destructor);
+
+       pool->jobs.queue = tevent_queue_create(pool, "job_queue");
+       if (pool->jobs.queue == NULL) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
 
-       pool->job_queue = tevent_queue_create(pool, "job_queue");
-       if (pool->job_queue == NULL) {
+       ret = pthread_mutex_init(&pool->jobs.mutex, NULL);
+       if (ret != 0) {
+               TALLOC_FREE(pool);
+               return NULL;
+       }
+
+       ret = pthread_cond_init(&pool->jobs.condvar, NULL);
+       if (ret != 0) {
+               pthread_mutex_destroy(&pool->jobs.mutex);
                TALLOC_FREE(pool);
                return NULL;
        }
 
        DLIST_ADD_END(ev->threads.pools, pool);
 
-       talloc_set_destructor(pool, tevent_threadpool_destructor);
        return pool;
 }
 
@@ -119,7 +151,7 @@ static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
        }
 
        if (job->busy.pool != NULL) {
-               DLIST_REMOVE(job->busy.pool->jobs, job);
+               DLIST_REMOVE(job->busy.pool->jobs.list, job);
                job->busy.pool = NULL;
        }
 
@@ -193,7 +225,13 @@ struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
        tevent_req_set_cleanup_fn(req, tevent_threadpool_job_cleanup);
        tevent_req_set_cancel_fn(req, tevent_threadpool_job_cancel);
 
-       state->qe = tevent_queue_add_optimize_empty(pool->job_queue, ev, req,
+       DLIST_ADD(pool->jobs.list, job);
+       pool->jobs.num_jobs += 1;
+
+       job->busy.pool = pool;
+       job->busy.req = req;
+
+       state->qe = tevent_queue_add_optimize_empty(pool->jobs.queue, ev, req,
                                                    tevent_threadpool_job_trigger,
                                                    state);
        if (tevent_req_nomem(state->qe, req)) {