2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
33 struct pthreadpool_job {
35 void (*fn)(void *private_data);
41 * List pthreadpools for fork safety
43 struct pthreadpool *prev, *next;
46 * Control access to this struct
48 pthread_mutex_t mutex;
51 * Threads waiting for work do so here
53 pthread_cond_t condvar;
58 size_t jobs_array_len;
59 struct pthreadpool_job *jobs;
65 * Indicate job completion
67 int (*signal_fn)(int jobid,
68 void (*job_fn)(void *private_data),
69 void *job_fn_private_data,
71 void *signal_fn_private_data;
74 * indicator to worker threads that they should shut down
79 * maximum number of threads
89 * Number of idle threads
94 * An array of threads that require joining.
97 pthread_t *exited; /* We alloc more */
100 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
101 static struct pthreadpool *pthreadpools = NULL;
102 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
104 static void pthreadpool_prep_atfork(void);
107 * Initialize a thread pool
110 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
111 int (*signal_fn)(int jobid,
112 void (*job_fn)(void *private_data),
113 void *job_fn_private_data,
115 void *signal_fn_private_data)
117 struct pthreadpool *pool;
120 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
124 pool->signal_fn = signal_fn;
125 pool->signal_fn_private_data = signal_fn_private_data;
127 pool->jobs_array_len = 4;
129 pool->jobs_array_len, sizeof(struct pthreadpool_job));
131 if (pool->jobs == NULL) {
136 pool->head = pool->num_jobs = 0;
138 ret = pthread_mutex_init(&pool->mutex, NULL);
145 ret = pthread_cond_init(&pool->condvar, NULL);
147 pthread_mutex_destroy(&pool->mutex);
154 pool->num_threads = 0;
155 pool->num_exited = 0;
157 pool->max_threads = max_threads;
160 ret = pthread_mutex_lock(&pthreadpools_mutex);
162 pthread_cond_destroy(&pool->condvar);
163 pthread_mutex_destroy(&pool->mutex);
168 DLIST_ADD(pthreadpools, pool);
170 ret = pthread_mutex_unlock(&pthreadpools_mutex);
173 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
180 static void pthreadpool_prepare(void)
183 struct pthreadpool *pool;
185 ret = pthread_mutex_lock(&pthreadpools_mutex);
190 while (pool != NULL) {
191 ret = pthread_mutex_lock(&pool->mutex);
197 static void pthreadpool_parent(void)
200 struct pthreadpool *pool;
202 for (pool = DLIST_TAIL(pthreadpools);
204 pool = DLIST_PREV(pool)) {
205 ret = pthread_mutex_unlock(&pool->mutex);
209 ret = pthread_mutex_unlock(&pthreadpools_mutex);
213 static void pthreadpool_child(void)
216 struct pthreadpool *pool;
218 for (pool = DLIST_TAIL(pthreadpools);
220 pool = DLIST_PREV(pool)) {
222 pool->num_threads = 0;
224 pool->num_exited = 0;
232 ret = pthread_mutex_unlock(&pool->mutex);
236 ret = pthread_mutex_unlock(&pthreadpools_mutex);
240 static void pthreadpool_prep_atfork(void)
242 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
247 * Do a pthread_join() on all children that have exited, pool->mutex must be
250 static void pthreadpool_join_children(struct pthreadpool *pool)
254 for (i=0; i<pool->num_exited; i++) {
257 ret = pthread_join(pool->exited[i], NULL);
260 * Severe internal error, we can't do much but
266 pool->num_exited = 0;
269 * Deliberately not free and NULL pool->exited. That will be
270 * re-used by realloc later.
275 * Destroy a thread pool, finishing all threads working for it
278 int pthreadpool_destroy(struct pthreadpool *pool)
282 ret = pthread_mutex_lock(&pool->mutex);
287 if ((pool->num_jobs != 0) || pool->shutdown) {
288 ret = pthread_mutex_unlock(&pool->mutex);
293 if (pool->num_threads > 0) {
295 * We have active threads, tell them to finish, wait for that.
300 if (pool->num_idle > 0) {
302 * Wake the idle threads. They will find
303 * pool->shutdown to be set and exit themselves
305 ret = pthread_cond_broadcast(&pool->condvar);
307 pthread_mutex_unlock(&pool->mutex);
312 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
314 if (pool->num_exited > 0) {
315 pthreadpool_join_children(pool);
319 * A thread that shuts down will also signal
322 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
324 pthread_mutex_unlock(&pool->mutex);
330 ret = pthread_mutex_unlock(&pool->mutex);
334 ret = pthread_mutex_destroy(&pool->mutex);
335 ret1 = pthread_cond_destroy(&pool->condvar);
344 ret = pthread_mutex_lock(&pthreadpools_mutex);
348 DLIST_REMOVE(pthreadpools, pool);
349 ret = pthread_mutex_unlock(&pthreadpools_mutex);
360 * Prepare for pthread_exit(), pool->mutex must be locked
362 static void pthreadpool_server_exit(struct pthreadpool *pool)
366 pool->num_threads -= 1;
368 exited = (pthread_t *)realloc(
369 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
371 if (exited == NULL) {
372 /* lost a thread status */
375 pool->exited = exited;
377 pool->exited[pool->num_exited] = pthread_self();
378 pool->num_exited += 1;
381 static bool pthreadpool_get_job(struct pthreadpool *p,
382 struct pthreadpool_job *job)
384 if (p->num_jobs == 0) {
387 *job = p->jobs[p->head];
388 p->head = (p->head+1) % p->jobs_array_len;
393 static bool pthreadpool_put_job(struct pthreadpool *p,
395 void (*fn)(void *private_data),
398 struct pthreadpool_job *job;
400 if (p->num_jobs == p->jobs_array_len) {
401 struct pthreadpool_job *tmp;
402 size_t new_len = p->jobs_array_len * 2;
405 p->jobs, sizeof(struct pthreadpool_job) * new_len);
412 * We just doubled the jobs array. The array implements a FIFO
413 * queue with a modulo-based wraparound, so we have to memcpy
414 * the jobs that are logically at the queue end but physically
415 * before the queue head into the reallocated area. The new
416 * space starts at the current jobs_array_len, and we have to
417 * copy everything before the current head job into the new
420 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
421 sizeof(struct pthreadpool_job) * p->head);
423 p->jobs_array_len = new_len;
426 job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
429 job->private_data = private_data;
436 static void *pthreadpool_server(void *arg)
438 struct pthreadpool *pool = (struct pthreadpool *)arg;
441 res = pthread_mutex_lock(&pool->mutex);
448 struct pthreadpool_job job;
451 * idle-wait at most 1 second. If nothing happens in that
452 * time, exit this thread.
455 clock_gettime(CLOCK_REALTIME, &ts);
458 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
461 res = pthread_cond_timedwait(
462 &pool->condvar, &pool->mutex, &ts);
465 if (res == ETIMEDOUT) {
467 if (pool->num_jobs == 0) {
469 * we timed out and still no work for
472 pthreadpool_server_exit(pool);
473 pthread_mutex_unlock(&pool->mutex);
482 if (pthreadpool_get_job(pool, &job)) {
486 * Do the work with the mutex unlocked
489 res = pthread_mutex_unlock(&pool->mutex);
492 job.fn(job.private_data);
494 res = pthread_mutex_lock(&pool->mutex);
497 ret = pool->signal_fn(job.id,
498 job.fn, job.private_data,
499 pool->signal_fn_private_data);
501 pthreadpool_server_exit(pool);
502 pthread_mutex_unlock(&pool->mutex);
507 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
509 * No more work to do and we're asked to shut down, so
512 pthreadpool_server_exit(pool);
514 if (pool->num_threads == 0) {
516 * Ping the main thread waiting for all of us
517 * workers to have quit.
519 pthread_cond_broadcast(&pool->condvar);
522 pthread_mutex_unlock(&pool->mutex);
528 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
529 void (*fn)(void *private_data), void *private_data)
533 sigset_t mask, omask;
535 res = pthread_mutex_lock(&pool->mutex);
540 if (pool->shutdown) {
542 * Protect against the pool being shut down while
543 * trying to add a job
545 res = pthread_mutex_unlock(&pool->mutex);
551 * Just some cleanup under the mutex
553 pthreadpool_join_children(pool);
556 * Add job to the end of the queue
558 if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
559 pthread_mutex_unlock(&pool->mutex);
563 if (pool->num_idle > 0) {
565 * We have idle threads, wake one.
567 res = pthread_cond_signal(&pool->condvar);
568 pthread_mutex_unlock(&pool->mutex);
572 if ((pool->max_threads != 0) &&
573 (pool->num_threads >= pool->max_threads)) {
575 * No more new threads, we just queue the request
577 pthread_mutex_unlock(&pool->mutex);
582 * Create a new worker thread. It should not receive any signals.
587 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
589 pthread_mutex_unlock(&pool->mutex);
593 res = pthread_create(&thread_id, NULL, pthreadpool_server,
596 pool->num_threads += 1;
599 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
601 pthread_mutex_unlock(&pool->mutex);