pthreadpool: Use detached threads
authorVolker Lendecke <vl@samba.org>
Fri, 9 Sep 2016 11:07:57 +0000 (13:07 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 4 Oct 2016 22:06:21 +0000 (00:06 +0200)
So far we used joinable threads. This prevented pthreadpool_destroy with
blocked threads. This patch converts pthreadpool to detached threads. Now
pthreadpool_destroy does not have to wait for the idle threads to finish, it
can immediately return. pthreadpool_destroy will tell all threads to exit, and
the last active thread will actually free(pthreadpool).

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/pthreadpool/pthreadpool.c
source3/lib/pthreadpool/pthreadpool.h

index a306c88fdcf1b20fccdeafae63d897754f7e6113..a1eb9241a4dbccaaf3dd4f7fa12e90b64c8f0884 100644 (file)
@@ -89,12 +89,6 @@ struct pthreadpool {
         * Number of idle threads
         */
        int num_idle;
-
-       /*
-        * An array of threads that require joining.
-        */
-       int                     num_exited;
-       pthread_t               *exited; /* We alloc more */
 };
 
 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -152,8 +146,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 
        pool->shutdown = false;
        pool->num_threads = 0;
-       pool->num_exited = 0;
-       pool->exited = NULL;
        pool->max_threads = max_threads;
        pool->num_idle = 0;
 
@@ -220,11 +212,6 @@ static void pthreadpool_child(void)
             pool = DLIST_PREV(pool)) {
 
                pool->num_threads = 0;
-
-               pool->num_exited = 0;
-               free(pool->exited);
-               pool->exited = NULL;
-
                pool->num_idle = 0;
                pool->head = 0;
                pool->num_jobs = 0;
@@ -243,36 +230,40 @@ static void pthreadpool_prep_atfork(void)
                       pthreadpool_child);
 }
 
-/*
- * Do a pthread_join() on all children that have exited, pool->mutex must be
- * locked
- */
-static void pthreadpool_join_children(struct pthreadpool *pool)
+static int pthreadpool_free(struct pthreadpool *pool)
 {
-       int i;
+       int ret, ret1;
 
-       for (i=0; i<pool->num_exited; i++) {
-               int ret;
+       ret = pthread_mutex_unlock(&pool->mutex);
+       assert(ret == 0);
 
-               ret = pthread_join(pool->exited[i], NULL);
-               if (ret != 0) {
-                       /*
-                        * Severe internal error, we can't do much but
-                        * abort here.
-                        */
-                       abort();
-               }
+       ret = pthread_mutex_destroy(&pool->mutex);
+       ret1 = pthread_cond_destroy(&pool->condvar);
+
+       if (ret != 0) {
+               return ret;
+       }
+       if (ret1 != 0) {
+               return ret1;
        }
-       pool->num_exited = 0;
 
-       /*
-        * Deliberately not free and NULL pool->exited. That will be
-        * re-used by realloc later.
-        */
+       ret = pthread_mutex_lock(&pthreadpools_mutex);
+       if (ret != 0) {
+               return ret;
+       }
+       DLIST_REMOVE(pthreadpools, pool);
+       ret = pthread_mutex_unlock(&pthreadpools_mutex);
+       assert(ret == 0);
+
+       free(pool->jobs);
+       free(pool);
+
+       return 0;
 }
 
 /*
- * Destroy a thread pool, finishing all threads working for it
+ * Destroy a thread pool. Wake up all idle threads for exit. The last
+ * one will free the pool.
  */
 
 int pthreadpool_destroy(struct pthreadpool *pool)
@@ -284,98 +275,49 @@ int pthreadpool_destroy(struct pthreadpool *pool)
                return ret;
        }
 
-       if ((pool->num_jobs != 0) || pool->shutdown) {
+       if (pool->num_threads == 0) {
+               ret = pthreadpool_free(pool);
+               return ret;
+       }
+
+       if (pool->shutdown) {
                ret = pthread_mutex_unlock(&pool->mutex);
                assert(ret == 0);
                return EBUSY;
        }
 
-       if (pool->num_threads > 0) {
-               /*
-                * We have active threads, tell them to finish, wait for that.
-                */
-
-               pool->shutdown = true;
-
-               if (pool->num_idle > 0) {
-                       /*
-                        * Wake the idle threads. They will find
-                        * pool->shutdown to be set and exit themselves
-                        */
-                       ret = pthread_cond_broadcast(&pool->condvar);
-                       if (ret != 0) {
-                               pthread_mutex_unlock(&pool->mutex);
-                               return ret;
-                       }
-               }
-
-               while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
-
-                       if (pool->num_exited > 0) {
-                               pthreadpool_join_children(pool);
-                               continue;
-                       }
-                       /*
-                        * A thread that shuts down will also signal
-                        * pool->condvar
-                        */
-                       ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
-                       if (ret != 0) {
-                               pthread_mutex_unlock(&pool->mutex);
-                               return ret;
-                       }
-               }
-       }
+       /*
+        * We have active threads, tell them to finish.
+        */
 
-       ret = pthread_mutex_unlock(&pool->mutex);
-       if (ret != 0) {
-               return ret;
-       }
-       ret = pthread_mutex_destroy(&pool->mutex);
-       ret1 = pthread_cond_destroy(&pool->condvar);
+       pool->shutdown = true;
 
-       if (ret != 0) {
-               return ret;
-       }
-       if (ret1 != 0) {
-               return ret1;
-       }
+       ret = pthread_cond_broadcast(&pool->condvar);
 
-       ret = pthread_mutex_lock(&pthreadpools_mutex);
-       if (ret != 0) {
-               return ret;
-       }
-       DLIST_REMOVE(pthreadpools, pool);
-       ret = pthread_mutex_unlock(&pthreadpools_mutex);
-       assert(ret == 0);
+       ret1 = pthread_mutex_unlock(&pool->mutex);
+       assert(ret1 == 0);
 
-       free(pool->exited);
-       free(pool->jobs);
-       free(pool);
-
-       return 0;
+       return ret;
 }
 
 /*
- * Prepare for pthread_exit(), pool->mutex must be locked
+ * Prepare for pthread_exit(), pool->mutex must be locked and will be
+ * unlocked here. This is a bit of a layering violation, but here we
+ * also take care of removing the pool if we're the last thread.
  */
 static void pthreadpool_server_exit(struct pthreadpool *pool)
 {
-       pthread_t *exited;
+       int ret;
 
        pool->num_threads -= 1;
 
-       exited = (pthread_t *)realloc(
-               pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
-
-       if (exited == NULL) {
-               /* lost a thread status */
+       if (pool->shutdown && (pool->num_threads == 0)) {
+               pthreadpool_free(pool);
                return;
        }
-       pool->exited = exited;
 
-       pool->exited[pool->num_exited] = pthread_self();
-       pool->num_exited += 1;
+       ret = pthread_mutex_unlock(&pool->mutex);
+       assert(ret == 0);
 }
 
 static bool pthreadpool_get_job(struct pthreadpool *p,
@@ -470,7 +412,6 @@ static void *pthreadpool_server(void *arg)
                                         * us. Exit.
                                         */
                                        pthreadpool_server_exit(pool);
-                                       pthread_mutex_unlock(&pool->mutex);
                                        return NULL;
                                }
 
@@ -500,7 +441,6 @@ static void *pthreadpool_server(void *arg)
 
                        if (ret != 0) {
                                pthreadpool_server_exit(pool);
-                               pthread_mutex_unlock(&pool->mutex);
                                return NULL;
                        }
                }
@@ -511,16 +451,6 @@ static void *pthreadpool_server(void *arg)
                         * exit
                         */
                        pthreadpool_server_exit(pool);
-
-                       if (pool->num_threads == 0) {
-                               /*
-                                * Ping the main thread waiting for all of us
-                                * workers to have quit.
-                                */
-                               pthread_cond_broadcast(&pool->condvar);
-                       }
-
-                       pthread_mutex_unlock(&pool->mutex);
                        return NULL;
                }
        }
@@ -529,6 +459,7 @@ static void *pthreadpool_server(void *arg)
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data)
 {
+       pthread_attr_t thread_attr;
        pthread_t thread_id;
        int res;
        sigset_t mask, omask;
@@ -548,11 +479,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                return EINVAL;
        }
 
-       /*
-        * Just some cleanup under the mutex
-        */
-       pthreadpool_join_children(pool);
-
        /*
         * Add job to the end of the queue
         */
@@ -585,20 +511,37 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
 
        sigfillset(&mask);
 
+       res = pthread_attr_init(&thread_attr);
+       if (res != 0) {
+               pthread_mutex_unlock(&pool->mutex);
+               return res;
+       }
+
+       res = pthread_attr_setdetachstate(
+               &thread_attr, PTHREAD_CREATE_DETACHED);
+       if (res != 0) {
+               pthread_attr_destroy(&thread_attr);
+               pthread_mutex_unlock(&pool->mutex);
+               return res;
+       }
+
         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
        if (res != 0) {
+               pthread_attr_destroy(&thread_attr);
                pthread_mutex_unlock(&pool->mutex);
                return res;
        }
 
        res = pthread_create(&thread_id, NULL, pthreadpool_server,
-                               (void *)pool);
+                            (void *)pool);
        if (res == 0) {
                pool->num_threads += 1;
        }
 
         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
 
+       pthread_attr_destroy(&thread_attr);
+
        pthread_mutex_unlock(&pool->mutex);
        return res;
 }
index ee9d9578050bfe7f552dc43d2106fd3d9d174bda..defbe5a9f6238fdfc42ccc36e307701a3ddf1f1a 100644 (file)
@@ -53,8 +53,13 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 /**
  * @brief Destroy a pthreadpool
  *
- * Destroy a pthreadpool. If jobs are still active, this returns
- * EBUSY.
+ * Destroy a pthreadpool. If jobs are submitted, but not yet active in
+ * a thread, they won't get executed. If a job has already been
+ * submitted to a thread, the job function will continue running, and
+ * the signal function might still be called. The caller of
+ * pthreadpool_init must make sure the required resources are still
+ * around when the pool is destroyed with pending jobs.  The last
+ * thread to exit will finally free() the pool memory.
  *
  * @param[in]  pool            The pool to destroy
  * @return                     success: 0, failure: errno