int tevent_threadpool_destroy(struct tevent_threadpool *pool);
#ifdef DOXYGEN
+#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(fn)
/**
* @brief Create a job to run in a threadpool
*
* Only one instance of the job can run at a time,
* but the job can be reused multiple times.
*
- If you want to run multiple instances of similar jobs
+ * If you want to run multiple instances of similar jobs
* at the same time, you need to create a tevent_threadpool_job
* for each instance.
*
+ * You need to declare a job description using
+ * TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(fn) as a global.
+ *
* @param[in] mem_ctx The memory context for the result.
* @param[in] fn The function to execute the job (must be thread safe!)
* @param[in] pstate Pointer to the request state.
- * @param[in] type The name of the job.
*/
struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- #name,
+ #fn,
void **pstate);
#else
struct tevent_threadpool_job_description {
struct tevent_threadpool *prev, *next;
struct tevent_context *ev;
- size_t max_threads;
- struct tevent_threadpool_job *jobs;
- struct tevent_queue *job_queue;
+ struct {
+ struct tevent_queue *queue;
+
+ pthread_mutex_t mutex;
+ pthread_cond_t condvar;
+
+ size_t max_threads;
+ size_t active_threads;
+ size_t num_jobs;
+ struct tevent_threadpool_job *list;
+ } jobs;
};
static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
{
- tevent_queue_stop(pool->job_queue);
+ int ret;
+
+ tevent_queue_stop(pool->jobs.queue);
//TODO DLIST_REMOVE all pool->jobs
+ ret = pthread_mutex_unlock(&pool->jobs.mutex);
+ if (ret != 0) {
+ return -1;
+ }
+ pthread_mutex_destroy(&pool->jobs.mutex);
+ pthread_cond_destroy(&pool->jobs.condvar);
return 0;
}
const char *location)
{
struct tevent_threadpool *pool;
+ int ret;
if (ev->wrapper.glue != NULL) {
/*
}
*pool = (struct tevent_threadpool) {
.ev = ev,
- .max_threads = max_threads,
+ .jobs = {
+ .max_threads = max_threads,
+ },
};
+ talloc_set_destructor(pool, tevent_threadpool_destructor);
+
+ pool->jobs.queue = tevent_queue_create(pool, "job_queue");
+ if (pool->jobs.queue == NULL) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
- pool->job_queue = tevent_queue_create(pool, "job_queue");
- if (pool->job_queue == NULL) {
+ ret = pthread_mutex_init(&pool->jobs.mutex, NULL);
+ if (ret != 0) {
+ TALLOC_FREE(pool);
+ return NULL;
+ }
+
+ ret = pthread_cond_init(&pool->jobs.condvar, NULL);
+ if (ret != 0) {
+ pthread_mutex_destroy(&pool->jobs.mutex);
TALLOC_FREE(pool);
return NULL;
}
DLIST_ADD_END(ev->threads.pools, pool);
- talloc_set_destructor(pool, tevent_threadpool_destructor);
return pool;
}
}
if (job->busy.pool != NULL) {
- DLIST_REMOVE(job->busy.pool->jobs, job);
+ DLIST_REMOVE(job->busy.pool->jobs.list, job);
job->busy.pool = NULL;
}
tevent_req_set_cleanup_fn(req, tevent_threadpool_job_cleanup);
tevent_req_set_cancel_fn(req, tevent_threadpool_job_cancel);
- state->qe = tevent_queue_add_optimize_empty(pool->job_queue, ev, req,
+ DLIST_ADD(pool->jobs.list, job);
+ pool->jobs.num_jobs += 1;
+
+ job->busy.pool = pool;
+ job->busy.req = req;
+
+ state->qe = tevent_queue_add_optimize_empty(pool->jobs.queue, ev, req,
tevent_threadpool_job_trigger,
state);
if (tevent_req_nomem(state->qe, req)) {