2 * Unix SMB/CIFS implementation.
3 * thread pool implementation
4 * Copyright (C) Volker Lendecke 2009
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License as published by
8 * the Free Software Foundation; either version 3 of the License, or
9 * (at your option) any later version.
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
16 * You should have received a copy of the GNU General Public License
17 * along with this program. If not, see <http://www.gnu.org/licenses/>.
29 #include "system/time.h"
30 #include "system/filesys.h"
32 #include "pthreadpool.h"
33 #include "lib/util/dlinklist.h"
35 struct pthreadpool_job {
36 struct pthreadpool_job *next;
38 void (*fn)(void *private_data);
44 * List pthreadpools for fork safety
46 struct pthreadpool *prev, *next;
49 * Control access to this struct
51 pthread_mutex_t mutex;
54 * Threads waiting for work do so here
56 pthread_cond_t condvar;
61 struct pthreadpool_job *jobs, *last_job;
69 * indicator to worker threads that they should shut down
74 * maximum number of threads
84 * Number of idle threads
89 * An array of threads that require joining.
92 pthread_t *exited; /* We alloc more */
95 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
96 static struct pthreadpool *pthreadpools = NULL;
97 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
99 static void pthreadpool_prep_atfork(void);
102 * Initialize a thread pool
105 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
107 struct pthreadpool *pool;
110 pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
115 ret = pipe(pool->sig_pipe);
122 ret = pthread_mutex_init(&pool->mutex, NULL);
124 close(pool->sig_pipe[0]);
125 close(pool->sig_pipe[1]);
130 ret = pthread_cond_init(&pool->condvar, NULL);
132 pthread_mutex_destroy(&pool->mutex);
133 close(pool->sig_pipe[0]);
134 close(pool->sig_pipe[1]);
140 pool->jobs = pool->last_job = NULL;
141 pool->num_threads = 0;
142 pool->num_exited = 0;
144 pool->max_threads = max_threads;
147 ret = pthread_mutex_lock(&pthreadpools_mutex);
149 pthread_cond_destroy(&pool->condvar);
150 pthread_mutex_destroy(&pool->mutex);
151 close(pool->sig_pipe[0]);
152 close(pool->sig_pipe[1]);
156 DLIST_ADD(pthreadpools, pool);
158 ret = pthread_mutex_unlock(&pthreadpools_mutex);
161 pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
168 static void pthreadpool_prepare(void)
171 struct pthreadpool *pool;
173 ret = pthread_mutex_lock(&pthreadpools_mutex);
178 while (pool != NULL) {
179 ret = pthread_mutex_lock(&pool->mutex);
185 static void pthreadpool_parent(void)
188 struct pthreadpool *pool;
190 pool = DLIST_TAIL(pthreadpools);
193 ret = pthread_mutex_unlock(&pool->mutex);
196 if (pool == pthreadpools) {
202 ret = pthread_mutex_unlock(&pthreadpools_mutex);
206 static void pthreadpool_child(void)
209 struct pthreadpool *pool;
211 pool = DLIST_TAIL(pthreadpools);
214 close(pool->sig_pipe[0]);
215 close(pool->sig_pipe[1]);
217 ret = pipe(pool->sig_pipe);
220 pool->num_threads = 0;
222 pool->num_exited = 0;
228 while (pool->jobs != NULL) {
229 struct pthreadpool_job *job;
231 pool->jobs = job->next;
234 pool->last_job = NULL;
236 ret = pthread_mutex_unlock(&pool->mutex);
239 if (pool == pthreadpools) {
245 ret = pthread_mutex_unlock(&pthreadpools_mutex);
249 static void pthreadpool_prep_atfork(void)
251 pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
256 * Return the file descriptor which becomes readable when a job has
260 int pthreadpool_signal_fd(struct pthreadpool *pool)
262 return pool->sig_pipe[0];
266 * Do a pthread_join() on all children that have exited, pool->mutex must be
269 static void pthreadpool_join_children(struct pthreadpool *pool)
273 for (i=0; i<pool->num_exited; i++) {
274 pthread_join(pool->exited[i], NULL);
276 pool->num_exited = 0;
279 * Deliberately not free and NULL pool->exited. That will be
280 * re-used by realloc later.
285 * Fetch a finished job number from the signal pipe
288 int pthreadpool_finished_job(struct pthreadpool *pool)
296 while ((nread == -1) && (errno == EINTR)) {
297 nread = read(pool->sig_pipe[0], &result, sizeof(int));
302 if (nread != sizeof(int)) {
309 * Destroy a thread pool, finishing all threads working for it
312 int pthreadpool_destroy(struct pthreadpool *pool)
316 ret = pthread_mutex_lock(&pool->mutex);
321 if ((pool->jobs != NULL) || pool->shutdown) {
322 ret = pthread_mutex_unlock(&pool->mutex);
327 if (pool->num_threads > 0) {
329 * We have active threads, tell them to finish, wait for that.
334 if (pool->num_idle > 0) {
336 * Wake the idle threads. They will find pool->quit to
337 * be set and exit themselves
339 ret = pthread_cond_broadcast(&pool->condvar);
341 pthread_mutex_unlock(&pool->mutex);
346 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
348 if (pool->num_exited > 0) {
349 pthreadpool_join_children(pool);
353 * A thread that shuts down will also signal
356 ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
358 pthread_mutex_unlock(&pool->mutex);
364 ret = pthread_mutex_unlock(&pool->mutex);
368 ret = pthread_mutex_destroy(&pool->mutex);
369 ret1 = pthread_cond_destroy(&pool->condvar);
378 ret = pthread_mutex_lock(&pthreadpools_mutex);
382 DLIST_REMOVE(pthreadpools, pool);
383 ret = pthread_mutex_unlock(&pthreadpools_mutex);
386 close(pool->sig_pipe[0]);
387 pool->sig_pipe[0] = -1;
389 close(pool->sig_pipe[1]);
390 pool->sig_pipe[1] = -1;
399 * Prepare for pthread_exit(), pool->mutex must be locked
401 static void pthreadpool_server_exit(struct pthreadpool *pool)
405 pool->num_threads -= 1;
407 exited = (pthread_t *)realloc(
408 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
410 if (exited == NULL) {
411 /* lost a thread status */
414 pool->exited = exited;
416 pool->exited[pool->num_exited] = pthread_self();
417 pool->num_exited += 1;
420 static void *pthreadpool_server(void *arg)
422 struct pthreadpool *pool = (struct pthreadpool *)arg;
425 res = pthread_mutex_lock(&pool->mutex);
432 struct pthreadpool_job *job;
435 * idle-wait at most 1 second. If nothing happens in that
436 * time, exit this thread.
439 clock_gettime(CLOCK_REALTIME, &ts);
442 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
445 res = pthread_cond_timedwait(
446 &pool->condvar, &pool->mutex, &ts);
449 if (res == ETIMEDOUT) {
451 if (pool->jobs == NULL) {
453 * we timed out and still no work for
456 pthreadpool_server_exit(pool);
457 pthread_mutex_unlock(&pool->mutex);
472 * Ok, there's work for us to do, remove the job from
473 * the pthreadpool list
475 pool->jobs = job->next;
476 if (pool->last_job == job) {
477 pool->last_job = NULL;
481 * Do the work with the mutex unlocked
484 res = pthread_mutex_unlock(&pool->mutex);
487 job->fn(job->private_data);
489 written = write(pool->sig_pipe[1], &job->id,
494 res = pthread_mutex_lock(&pool->mutex);
497 if (written != sizeof(int)) {
498 pthreadpool_server_exit(pool);
499 pthread_mutex_unlock(&pool->mutex);
504 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
506 * No more work to do and we're asked to shut down, so
509 pthreadpool_server_exit(pool);
511 if (pool->num_threads == 0) {
513 * Ping the main thread waiting for all of us
514 * workers to have quit.
516 pthread_cond_broadcast(&pool->condvar);
519 pthread_mutex_unlock(&pool->mutex);
525 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
526 void (*fn)(void *private_data), void *private_data)
528 struct pthreadpool_job *job;
531 sigset_t mask, omask;
533 job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
539 job->private_data = private_data;
543 res = pthread_mutex_lock(&pool->mutex);
549 if (pool->shutdown) {
551 * Protect against the pool being shut down while
552 * trying to add a job
554 res = pthread_mutex_unlock(&pool->mutex);
561 * Just some cleanup under the mutex
563 pthreadpool_join_children(pool);
566 * Add job to the end of the queue
568 if (pool->jobs == NULL) {
572 pool->last_job->next = job;
574 pool->last_job = job;
576 if (pool->num_idle > 0) {
578 * We have idle threads, wake one.
580 res = pthread_cond_signal(&pool->condvar);
581 pthread_mutex_unlock(&pool->mutex);
585 if ((pool->max_threads != 0) &&
586 (pool->num_threads >= pool->max_threads)) {
588 * No more new threads, we just queue the request
590 pthread_mutex_unlock(&pool->mutex);
595 * Create a new worker thread. It should not receive any signals.
600 res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
602 pthread_mutex_unlock(&pool->mutex);
606 res = pthread_create(&thread_id, NULL, pthreadpool_server,
609 pool->num_threads += 1;
612 assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
614 pthread_mutex_unlock(&pool->mutex);