Revert "pthreadpool: add pthreadpool_restart_check[_monitor_{fd,drain}]()"
authorRalph Boehme <slow@samba.org>
Sun, 23 Dec 2018 08:43:07 +0000 (09:43 +0100)
committerStefan Metzmacher <metze@samba.org>
Fri, 11 Jan 2019 22:11:13 +0000 (23:11 +0100)
This reverts commit 3c4cdb290723432b00ff9ff88b892cb4e66e76cd.

See the discussion in

https://lists.samba.org/archive/samba-technical/2018-December/131731.html

for the reasoning behind this revert.

Signed-off-by: Ralph Boehme <slow@samba.org>
Reviewed-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
lib/pthreadpool/pthreadpool.c
lib/pthreadpool/pthreadpool.h
lib/pthreadpool/pthreadpool_sync.c

index d482c1599410e0045daaa9afdd64d55b95ca93eb..c2bafd52c08b556a708223563c1fd154609287d3 100644 (file)
@@ -24,7 +24,6 @@
 #include "system/filesys.h"
 #include "pthreadpool.h"
 #include "lib/util/dlinklist.h"
-#include "lib/util/blocking.h"
 
 #ifdef NDEBUG
 #undef NDEBUG
@@ -54,8 +53,6 @@ struct pthreadpool {
         */
        pthread_cond_t condvar;
 
-       int check_pipefd[2];
-
        /*
         * Array of jobs
         */
@@ -140,7 +137,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
 {
        struct pthreadpool *pool;
        int ret;
-       bool ok;
 
        pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
        if (pool == NULL) {
@@ -158,52 +154,10 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
                return ENOMEM;
        }
 
-       ret = pipe(pool->check_pipefd);
-       if (ret != 0) {
-               free(pool->jobs);
-               free(pool);
-               return ENOMEM;
-       }
-
-       ok = smb_set_close_on_exec(pool->check_pipefd[0]);
-       if (!ok) {
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
-               free(pool->jobs);
-               free(pool);
-               return EINVAL;
-       }
-       ok = smb_set_close_on_exec(pool->check_pipefd[1]);
-       if (!ok) {
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
-               free(pool->jobs);
-               free(pool);
-               return EINVAL;
-       }
-       ret = set_blocking(pool->check_pipefd[0], true);
-       if (ret == -1) {
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
-               free(pool->jobs);
-               free(pool);
-               return EINVAL;
-       }
-       ret = set_blocking(pool->check_pipefd[1], false);
-       if (ret == -1) {
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
-               free(pool->jobs);
-               free(pool);
-               return EINVAL;
-       }
-
        pool->head = pool->num_jobs = 0;
 
        ret = pthread_mutex_init(&pool->mutex, NULL);
        if (ret != 0) {
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -212,8 +166,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
        ret = pthread_cond_init(&pool->condvar, NULL);
        if (ret != 0) {
                pthread_mutex_destroy(&pool->mutex);
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -223,8 +175,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
        if (ret != 0) {
                pthread_cond_destroy(&pool->condvar);
                pthread_mutex_destroy(&pool->mutex);
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -247,8 +197,6 @@ int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
                pthread_mutex_destroy(&pool->fork_mutex);
                pthread_cond_destroy(&pool->condvar);
                pthread_mutex_destroy(&pool->mutex);
-               close(pool->check_pipefd[0]);
-               close(pool->check_pipefd[1]);
                free(pool->jobs);
                free(pool);
                return ret;
@@ -412,14 +360,6 @@ static void pthreadpool_child(void)
                pool->head = 0;
                pool->num_jobs = 0;
                pool->stopped = true;
-               if (pool->check_pipefd[0] != -1) {
-                       close(pool->check_pipefd[0]);
-                       pool->check_pipefd[0] = -1;
-               }
-               if (pool->check_pipefd[1] != -1) {
-                       close(pool->check_pipefd[1]);
-                       pool->check_pipefd[1] = -1;
-               }
 
                ret = pthread_cond_init(&pool->condvar, NULL);
                assert(ret == 0);
@@ -482,14 +422,6 @@ static int pthreadpool_free(struct pthreadpool *pool)
                return ret2;
        }
 
-       if (pool->check_pipefd[0] != -1) {
-               close(pool->check_pipefd[0]);
-               pool->check_pipefd[0] = -1;
-       }
-       if (pool->check_pipefd[1] != -1) {
-               close(pool->check_pipefd[1]);
-               pool->check_pipefd[1] = -1;
-       }
        free(pool->jobs);
        free(pool);
 
@@ -506,15 +438,6 @@ static int pthreadpool_stop_locked(struct pthreadpool *pool)
 
        pool->stopped = true;
 
-       if (pool->check_pipefd[0] != -1) {
-               close(pool->check_pipefd[0]);
-               pool->check_pipefd[0] = -1;
-       }
-       if (pool->check_pipefd[1] != -1) {
-               close(pool->check_pipefd[1]);
-               pool->check_pipefd[1] = -1;
-       }
-
        if (pool->num_threads == 0) {
                return 0;
        }
@@ -599,33 +522,6 @@ static void pthreadpool_server_exit(struct pthreadpool *pool)
 
        free_it = (pool->destroyed && (pool->num_threads == 0));
 
-       while (true) {
-               uint8_t c = 0;
-               ssize_t nwritten = 0;
-
-               if (pool->check_pipefd[1] == -1) {
-                       break;
-               }
-
-               nwritten = write(pool->check_pipefd[1], &c, 1);
-               if (nwritten == -1) {
-                       if (errno == EINTR) {
-                               continue;
-                       }
-                       if (errno == EAGAIN) {
-                               break;
-                       }
-#ifdef EWOULDBLOCK
-                       if (errno == EWOULDBLOCK) {
-                               break;
-                       }
-#endif
-                       /* ignore ... */
-               }
-
-               break;
-       }
-
        ret = pthread_mutex_unlock(&pool->mutex);
        assert(ret == 0);
 
@@ -956,183 +852,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
        return res;
 }
 
-int pthreadpool_restart_check(struct pthreadpool *pool)
-{
-       int res;
-       int unlock_res;
-       unsigned possible_threads = 0;
-       unsigned missing_threads = 0;
-
-       assert(!pool->destroyed);
-
-       res = pthread_mutex_lock(&pool->mutex);
-       if (res != 0) {
-               return res;
-       }
-
-       if (pool->stopped) {
-               /*
-                * Protect against the pool being shut down while
-                * trying to add a job
-                */
-               unlock_res = pthread_mutex_unlock(&pool->mutex);
-               assert(unlock_res == 0);
-               return EINVAL;
-       }
-
-       if (pool->num_jobs == 0) {
-               /*
-                * This also handles the pool->max_threads == 0 case as it never
-                * calls pthreadpool_put_job()
-                */
-               unlock_res = pthread_mutex_unlock(&pool->mutex);
-               assert(unlock_res == 0);
-               return 0;
-       }
-
-       if (pool->num_idle > 0) {
-               /*
-                * We have idle threads and pending jobs,
-                * this means we better let all threads
-                * start and check for pending jobs.
-                */
-               res = pthread_cond_broadcast(&pool->condvar);
-               assert(res == 0);
-       }
-
-       if (pool->num_threads < pool->max_threads) {
-               possible_threads = pool->max_threads - pool->num_threads;
-       }
-
-       if (pool->num_idle < pool->num_jobs) {
-               missing_threads = pool->num_jobs - pool->num_idle;
-       }
-
-       missing_threads = MIN(missing_threads, possible_threads);
-
-       while (missing_threads > 0) {
-
-               res = pthreadpool_create_thread(pool);
-               if (res != 0) {
-                       break;
-               }
-
-               missing_threads--;
-       }
-
-       if (missing_threads == 0) {
-               /*
-                * Ok, we recreated all thread we need.
-                */
-               unlock_res = pthread_mutex_unlock(&pool->mutex);
-               assert(unlock_res == 0);
-               return 0;
-       }
-
-       if (pool->num_threads != 0) {
-               /*
-                * At least one thread is still available, let
-                * that one run the queued jobs.
-                */
-               unlock_res = pthread_mutex_unlock(&pool->mutex);
-               assert(unlock_res == 0);
-               return 0;
-       }
-
-       /*
-        * There's no thread available to run any pending jobs.
-        * The caller may want to cancel the jobs and destroy the pool.
-        * But that's up to the caller.
-        */
-       unlock_res = pthread_mutex_unlock(&pool->mutex);
-       assert(unlock_res == 0);
-
-       return res;
-}
-
-int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
-{
-       int fd;
-       int ret;
-       bool ok;
-
-       if (pool->stopped) {
-               errno = EINVAL;
-               return -1;
-       }
-
-       if (pool->check_pipefd[0] == -1) {
-               errno = ENOSYS;
-               return -1;
-       }
-
-       fd = dup(pool->check_pipefd[0]);
-       if (fd == -1) {
-               return -1;
-       }
-
-       ok = smb_set_close_on_exec(fd);
-       if (!ok) {
-               int saved_errno = errno;
-               close(fd);
-               errno = saved_errno;
-               return -1;
-       }
-
-       ret = set_blocking(fd, false);
-       if (ret == -1) {
-               int saved_errno = errno;
-               close(fd);
-               errno = saved_errno;
-               return -1;
-       }
-
-       return fd;
-}
-
-int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
-{
-       if (pool->stopped) {
-               return EINVAL;
-       }
-
-       if (pool->check_pipefd[0] == -1) {
-               return ENOSYS;
-       }
-
-       while (true) {
-               uint8_t buf[128];
-               ssize_t nread;
-
-               nread = read(pool->check_pipefd[0], buf, sizeof(buf));
-               if (nread == -1) {
-                       if (errno == EINTR) {
-                               continue;
-                       }
-                       if (errno == EAGAIN) {
-                               return 0;
-                       }
-#ifdef EWOULDBLOCK
-                       if (errno == EWOULDBLOCK) {
-                               return 0;
-                       }
-#endif
-                       if (errno == 0) {
-                               errno = INT_MAX;
-                       }
-
-                       return errno;
-               }
-
-               if (nread < sizeof(buf)) {
-                       return 0;
-               }
-       }
-
-       abort();
-       return INT_MAX;
-}
-
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
                              void (*fn)(void *private_data), void *private_data)
 {
index 543567ceaf78e177e0eb909fc2f92f2e7d419d5c..d8daf9e4519b95e68b17fb1d085c5e1611e9ba9e 100644 (file)
@@ -144,70 +144,6 @@ int pthreadpool_destroy(struct pthreadpool *pool);
 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                        void (*fn)(void *private_data), void *private_data);
 
-/**
- * @brief Check if the pthreadpool needs a restart.
- *
- * This checks if there are enough threads to run the already
- * queued jobs. This should be called only the callers signal_fn
- * (passed to pthreadpool_init()) returned an error, so
- * that the job's worker thread exited.
- *
- * Typically this is called once the file destriptor
- * returned by pthreadpool_restart_check_monitor_fd()
- * became readable and pthreadpool_restart_check_monitor_drain()
- * returned success.
- *
- * This function tries to restart the missing threads.
- *
- * @param[in]  pool            The pool to run the job on
- * @return                     success: 0, failure: errno
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check_monitor_drain
- */
-int pthreadpool_restart_check(struct pthreadpool *pool);
-
-/**
- * @brief Return a file destriptor that monitors the pool.
- *
- * If the file destrictor becomes readable,
- * the event handler should call pthreadpool_restart_check_monitor_drain().
- *
- * pthreadpool_restart_check() should also be called once the
- * state is drained.
- *
- * This function returns a fresh fd using dup() each time.
- *
- * If the pool doesn't require restarts, this function
- * returns -1 and sets errno = ENOSYS. The caller
- * may ignore that situation.
- *
- * @param[in]  pool            The pool to run the job on
- * @return                     success: 0, failure: -1 (set errno)
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check_monitor_drain
- */
-int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool);
-
-/**
- * @brief Drain the monitor file destriptor of the pool.
- *
- * If the file destrictor returned by pthreadpool_restart_check_monitor_fd()
- * becomes readable, pthreadpool_restart_check_monitor_drain() should be
- * called before pthreadpool_restart_check().
- *
- * If this function returns an error the caller should close
- * the file destriptor it got from pthreadpool_restart_check_monitor_fd().
- *
- * @param[in]  pool            The pool to run the job on
- * @return                     success: 0, failure: errno
- *
- * @see pthreadpool_restart_check_monitor_fd
- * @see pthreadpool_restart_check
- */
-int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool);
-
 /**
  * @brief Try to cancel a job in a pthreadpool
  *
index a476ea712c3a586fb40b736af8f2a701cd24aa54..2ed6f36dbbc70745cf46ba5ca01296b6ab156db9 100644 (file)
@@ -83,26 +83,6 @@ int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
                               pool->signal_fn_private_data);
 }
 
-int pthreadpool_restart_check(struct pthreadpool *pool)
-{
-       if (pool->stopped) {
-               return EINVAL;
-       }
-
-       return 0;
-}
-
-int pthreadpool_restart_check_monitor_fd(struct pthreadpool *pool)
-{
-       errno = ENOSYS;
-       return -1;
-}
-
-int pthreadpool_restart_check_monitor_drain(struct pthreadpool *pool)
-{
-       return EINVAL;
-}
-
 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
                              void (*fn)(void *private_data), void *private_data)
 {