2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #include "system/time.h"
30 #include "system/filesys.h"
32 #include "pthreadpool.h"
33 #include "lib/util/dlinklist.h"
35 struct pthreadpool_job {
36 struct pthreadpool_job *next;
38 void (*fn)(void *private_data);
44 * List pthreadpools for fork safety
46 struct pthreadpool *prev, *next;
49 * Control access to this struct
51 pthread_mutex_t mutex;
54 * Threads waiting for work do so here
56 pthread_cond_t condvar;
61 struct pthreadpool_job *jobs, *last_job;
69 * indicator to worker threads that they should shut down
74 * maximum number of threads
84 * Number of idle threads
89 * An array of threads that require joining.
92 pthread_t *exited; /* We alloc more */
95 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
96 static struct pthreadpool *pthreadpools = NULL;
97 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
99 static void pthreadpool_prep_atfork(void);
102 * Initialize a thread pool
105 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
107 struct pthreadpool *pool;
110 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
115 ret = pipe(pool->sig_pipe);
122 ret = pthread_mutex_init(&pool->mutex, NULL);
124 close(pool->sig_pipe[0]);
125 close(pool->sig_pipe[1]);
130 ret = pthread_cond_init(&pool->condvar, NULL);
132 pthread_mutex_destroy(&pool->mutex);
133 close(pool->sig_pipe[0]);
134 close(pool->sig_pipe[1]);
140 pool->jobs = pool->last_job = NULL;
141 pool->num_threads = 0;
142 pool->num_exited = 0;
144 pool->max_threads = max_threads;
147 ret = pthread_mutex_lock(&pthreadpools_mutex);
149 pthread_cond_destroy(&pool->condvar);
150 pthread_mutex_destroy(&pool->mutex);
151 close(pool->sig_pipe[0]);
152 close(pool->sig_pipe[1]);
156 DLIST_ADD(pthreadpools, pool);
158 ret = pthread_mutex_unlock(&pthreadpools_mutex);
161 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
168 static void pthreadpool_prepare(void)
171 struct pthreadpool *pool;
173 ret = pthread_mutex_lock(&pthreadpools_mutex);
178 while (pool != NULL) {
179 ret = pthread_mutex_lock(&pool->mutex);
185 static void pthreadpool_parent(void)
188 struct pthreadpool *pool;
190 pool = DLIST_TAIL(pthreadpools);
193 ret = pthread_mutex_unlock(&pool->mutex);
196 if (pool == pthreadpools) {
202 ret = pthread_mutex_unlock(&pthreadpools_mutex);
206 static void pthreadpool_child(void)
209 struct pthreadpool *pool;
211 pool = DLIST_TAIL(pthreadpools);
214 close(pool->sig_pipe[0]);
215 close(pool->sig_pipe[1]);
217 ret = pipe(pool->sig_pipe);
220 pool->num_threads = 0;
222 pool->num_exited = 0;
228 while (pool->jobs != NULL) {
229 struct pthreadpool_job *job;
231 pool->jobs = job->next;
234 pool->last_job = NULL;
236 ret = pthread_mutex_unlock(&pool->mutex);
239 if (pool == pthreadpools) {
245 ret = pthread_mutex_unlock(&pthreadpools_mutex);
249 static void pthreadpool_prep_atfork(void)
251 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
256 * Return the file descriptor which becomes readable when a job has
260 int pthreadpool_signal_fd(struct pthreadpool *pool)
262 return pool->sig_pipe[0];
266 * Do a pthread_join() on all children that have exited, pool->mutex must be
269 static void pthreadpool_join_children(struct pthreadpool *pool)
273 for (i=0; i<pool->num_exited; i++) {
274 pthread_join(pool->exited[i], NULL);
276 pool->num_exited = 0;
279 * Deliberately not free and NULL pool->exited. That will be
280 * re-used by realloc later.
285 * Fetch a finished job number from the signal pipe
288 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
296 while ((nread == -1) && (errno == EINTR)) {
297 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
302 if (nread != sizeof(int)) {
310 * Destroy a thread pool, finishing all threads working for it
313 int pthreadpool_destroy(struct pthreadpool *pool)
317 ret = pthread_mutex_lock(&pool->mutex);
322 if ((pool->jobs != NULL) || pool->shutdown) {
323 ret = pthread_mutex_unlock(&pool->mutex);
328 if (pool->num_threads > 0) {
330 * We have active threads, tell them to finish, wait for that.
335 if (pool->num_idle > 0) {
337 * Wake the idle threads. They will find pool->quit to
338 * be set and exit themselves
340 ret = pthread_cond_broadcast(&pool->condvar);
342 pthread_mutex_unlock(&pool->mutex);
347 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
349 if (pool->num_exited > 0) {
350 pthreadpool_join_children(pool);
354 * A thread that shuts down will also signal
357 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
359 pthread_mutex_unlock(&pool->mutex);
365 ret = pthread_mutex_unlock(&pool->mutex);
369 ret = pthread_mutex_destroy(&pool->mutex);
370 ret1 = pthread_cond_destroy(&pool->condvar);
379 ret = pthread_mutex_lock(&pthreadpools_mutex);
383 DLIST_REMOVE(pthreadpools, pool);
384 ret = pthread_mutex_unlock(&pthreadpools_mutex);
387 close(pool->sig_pipe[0]);
388 pool->sig_pipe[0] = -1;
390 close(pool->sig_pipe[1]);
391 pool->sig_pipe[1] = -1;
400 * Prepare for pthread_exit(), pool->mutex must be locked
402 static void pthreadpool_server_exit(struct pthreadpool *pool)
406 pool->num_threads -= 1;
408 exited = (pthread_t *)realloc(
409 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
411 if (exited == NULL) {
412 /* lost a thread status */
415 pool->exited = exited;
417 pool->exited[pool->num_exited] = pthread_self();
418 pool->num_exited += 1;
421 static void *pthreadpool_server(void *arg)
423 struct pthreadpool *pool = (struct pthreadpool *)arg;
426 res = pthread_mutex_lock(&pool->mutex);
433 struct pthreadpool_job *job;
436 * idle-wait at most 1 second. If nothing happens in that
437 * time, exit this thread.
440 clock_gettime(CLOCK_REALTIME, &ts);
443 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
446 res = pthread_cond_timedwait(
447 &pool->condvar, &pool->mutex, &ts);
450 if (res == ETIMEDOUT) {
452 if (pool->jobs == NULL) {
454 * we timed out and still no work for
457 pthreadpool_server_exit(pool);
458 pthread_mutex_unlock(&pool->mutex);
473 * Ok, there's work for us to do, remove the job from
474 * the pthreadpool list
476 pool->jobs = job->next;
477 if (pool->last_job == job) {
478 pool->last_job = NULL;
482 * Do the work with the mutex unlocked
485 res = pthread_mutex_unlock(&pool->mutex);
488 job->fn(job->private_data);
490 written = write(pool->sig_pipe[1], &job->id,
495 res = pthread_mutex_lock(&pool->mutex);
498 if (written != sizeof(int)) {
499 pthreadpool_server_exit(pool);
500 pthread_mutex_unlock(&pool->mutex);
505 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
507 * No more work to do and we're asked to shut down, so
510 pthreadpool_server_exit(pool);
512 if (pool->num_threads == 0) {
514 * Ping the main thread waiting for all of us
515 * workers to have quit.
517 pthread_cond_broadcast(&pool->condvar);
520 pthread_mutex_unlock(&pool->mutex);
526 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
527 void (*fn)(void *private_data), void *private_data)
529 struct pthreadpool_job *job;
532 sigset_t mask, omask;
534 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
540 job->private_data = private_data;
544 res = pthread_mutex_lock(&pool->mutex);
550 if (pool->shutdown) {
552 * Protect against the pool being shut down while
553 * trying to add a job
555 res = pthread_mutex_unlock(&pool->mutex);
562 * Just some cleanup under the mutex
564 pthreadpool_join_children(pool);
567 * Add job to the end of the queue
569 if (pool->jobs == NULL) {
573 pool->last_job->next = job;
575 pool->last_job = job;
577 if (pool->num_idle > 0) {
579 * We have idle threads, wake one.
581 res = pthread_cond_signal(&pool->condvar);
582 pthread_mutex_unlock(&pool->mutex);
586 if ((pool->max_threads != 0) &&
587 (pool->num_threads >= pool->max_threads)) {
589 * No more new threads, we just queue the request
591 pthread_mutex_unlock(&pool->mutex);
596 * Create a new worker thread. It should not receive any signals.
601 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
603 pthread_mutex_unlock(&pool->mutex);
607 res = pthread_create(&thread_id, NULL, pthreadpool_server,
610 pool->num_threads += 1;
613 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
615 pthread_mutex_unlock(&pool->mutex);