pthreadpool_pipe: Implement EBUSY for _destroy
authorVolker Lendecke <vl@samba.org>
Fri, 9 Sep 2016 13:18:41 +0000 (15:18 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 4 Oct 2016 22:06:21 +0000 (00:06 +0200)
Restore EBUSY on pthreadpool_pipe_destroy.

We need to count jobs in pthreadpool_pipe so that pthreadpool can exit with
active jobs. Unfortunately this makes pthreadpool_pipe_add_job non-threadsafe.
We could add mutexes around "num_jobs", but this would mean another set of
pthread_atfork functions. As we don't use threaded pthreadpool_pipe_add_job
except in the tests, just remove the tests...

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/pthreadpool/pthreadpool_pipe.c
source3/lib/pthreadpool/tests.c

index f7995abb1405c13b6fdf7d66a7d22f98c509147d..d6d519aeba418e399b87b44cc4f6ed120dd78261 100644 (file)
@@ -24,6 +24,7 @@
 
 struct pthreadpool_pipe {
        struct pthreadpool *pool;
+       int num_jobs;
        pid_t pid;
        int pipe_fds[2];
 };
@@ -39,7 +40,7 @@ int pthreadpool_pipe_init(unsigned max_threads,
        struct pthreadpool_pipe *pool;
        int ret;
 
-       pool = malloc(sizeof(struct pthreadpool_pipe));
+       pool = calloc(1, sizeof(struct pthreadpool_pipe));
        if (pool == NULL) {
                return ENOMEM;
        }
@@ -88,6 +89,10 @@ int pthreadpool_pipe_destroy(struct pthreadpool_pipe *pool)
 {
        int ret;
 
+       if (pool->num_jobs != 0) {
+               return EBUSY;
+       }
+
        ret = pthreadpool_destroy(pool->pool);
        if (ret != 0) {
                return ret;
@@ -132,6 +137,7 @@ static int pthreadpool_pipe_reinit(struct pthreadpool_pipe *pool)
        }
 
        pool->pipe_fds[0] = signal_fd;
+       pool->num_jobs = 0;
 
        return 0;
 }
@@ -148,7 +154,13 @@ int pthreadpool_pipe_add_job(struct pthreadpool_pipe *pool, int job_id,
        }
 
        ret = pthreadpool_add_job(pool->pool, job_id, fn, private_data);
-       return ret;
+       if (ret != 0) {
+               return ret;
+       }
+
+       pool->num_jobs += 1;
+
+       return 0;
 }
 
 int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
@@ -159,7 +171,7 @@ int pthreadpool_pipe_signal_fd(struct pthreadpool_pipe *pool)
 int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
                                   unsigned num_jobids)
 {
-       ssize_t to_read, nread;
+       ssize_t to_read, nread, num_jobs;
        pid_t pid = getpid();
 
        if (pool->pid != pid) {
@@ -178,5 +190,13 @@ int pthreadpool_pipe_finished_jobs(struct pthreadpool_pipe *pool, int *jobids,
        if ((nread % sizeof(int)) != 0) {
                return -EINVAL;
        }
-       return nread / sizeof(int);
+
+       num_jobs = nread / sizeof(int);
+
+       if (num_jobs > pool->num_jobs) {
+               return -EINVAL;
+       }
+       pool->num_jobs -= num_jobs;
+
+       return num_jobs;
 }
index 0b48b41a7f0d0425518ba267b47fd2992b840122..933808e1777e25eb6216b2ce169040ad041ac2a2 100644 (file)
@@ -22,7 +22,7 @@ static int test_init(void)
        }
        ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
-               fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
+               fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
                        strerror(ret));
                return -1;
        }
@@ -72,6 +72,11 @@ static int test_jobs(int num_threads, int num_jobs)
        for (i=0; i<num_jobs; i++) {
                int jobid = -1;
                ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+               if (ret < 0) {
+                       fprintf(stderr, "pthreadpool_pipe_finished_jobs "
+                               "failed: %s\n", strerror(-ret));
+                       return -1;
+               }
                if ((ret != 1) || (jobid >= num_jobs)) {
                        fprintf(stderr, "invalid job number %d\n", jobid);
                        return -1;
@@ -103,7 +108,7 @@ static int test_busydestroy(void)
        struct pthreadpool_pipe *p;
        int timeout = 50;
        struct pollfd pfd;
-       int ret;
+       int ret, jobid;
 
        ret = pthreadpool_pipe_init(1, &p);
        if (ret != 0) {
@@ -128,6 +133,13 @@ static int test_busydestroy(void)
 
        poll(&pfd, 1, -1);
 
+       ret = pthreadpool_pipe_finished_jobs(p, &jobid, 1);
+       if (ret < 0) {
+               fprintf(stderr, "pthreadpool_pipe_finished_jobs failed: %s\n",
+                       strerror(-ret));
+               return -1;
+       }
+
        ret = pthreadpool_pipe_destroy(p);
        if (ret != 0) {
                fprintf(stderr, "pthreadpool_pipe_destroy failed: %s\n",
@@ -137,191 +149,6 @@ static int test_busydestroy(void)
        return 0;
 }
 
-struct threaded_state {
-       pthread_t tid;
-       struct pthreadpool_pipe *p;
-       int start_job;
-       int num_jobs;
-       int timeout;
-};
-
-static void *test_threaded_worker(void *p)
-{
-       struct threaded_state *state = (struct threaded_state *)p;
-       int i;
-
-       for (i=0; i<state->num_jobs; i++) {
-               int ret = pthreadpool_pipe_add_job(
-                       state->p, state->start_job + i,
-                       test_sleep, &state->timeout);
-               if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_pipe_add_job failed: "
-                               "%s\n", strerror(ret));
-                       return NULL;
-               }
-       }
-       return NULL;
-}
-
-static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
-                               int num_jobs)
-{
-       struct pthreadpool_pipe **pools;
-       struct threaded_state *states;
-       struct threaded_state *state;
-       struct pollfd *pfds;
-       char *finished;
-       pid_t child;
-       int i, ret, poolnum;
-       int received;
-
-       states = calloc(num_threads, sizeof(struct threaded_state));
-       if (states == NULL) {
-               fprintf(stderr, "calloc failed\n");
-               return -1;
-       }
-
-       finished = calloc(num_threads * num_jobs, 1);
-       if (finished == NULL) {
-               fprintf(stderr, "calloc failed\n");
-               return -1;
-       }
-
-       pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
-       if (pools == NULL) {
-               fprintf(stderr, "calloc failed\n");
-               return -1;
-       }
-
-       pfds = calloc(num_pools, sizeof(struct pollfd));
-       if (pfds == NULL) {
-               fprintf(stderr, "calloc failed\n");
-               return -1;
-       }
-
-       for (i=0; i<num_pools; i++) {
-               ret = pthreadpool_pipe_init(poolsize, &pools[i]);
-               if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
-                               strerror(ret));
-                       return -1;
-               }
-               pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
-               pfds[i].events = POLLIN|POLLHUP;
-       }
-
-       poolnum = 0;
-
-       for (i=0; i<num_threads; i++) {
-               state = &states[i];
-
-               state->p = pools[poolnum];
-               poolnum = (poolnum + 1) % num_pools;
-
-               state->num_jobs = num_jobs;
-               state->timeout = 1;
-               state->start_job = i * num_jobs;
-
-               ret = pthread_create(&state->tid, NULL, test_threaded_worker,
-                                    state);
-               if (ret != 0) {
-                       fprintf(stderr, "pthread_create failed: %s\n",
-                               strerror(ret));
-                       return -1;
-               }
-       }
-
-       if (random() % 1) {
-               poll(NULL, 0, 1);
-       }
-
-       child = fork();
-       if (child < 0) {
-               fprintf(stderr, "fork failed: %s\n", strerror(errno));
-               return -1;
-       }
-       if (child == 0) {
-               for (i=0; i<num_pools; i++) {
-                       ret = pthreadpool_pipe_destroy(pools[i]);
-                       if (ret != 0) {
-                               fprintf(stderr, "pthreadpool_pipe_destroy "
-                                       "failed: %s\n", strerror(ret));
-                               exit(1);
-                       }
-               }
-               /* child */
-               exit(0);
-       }
-
-       for (i=0; i<num_threads; i++) {
-               ret = pthread_join(states[i].tid, NULL);
-               if (ret != 0) {
-                       fprintf(stderr, "pthread_join(%d) failed: %s\n",
-                               i, strerror(ret));
-                       return -1;
-               }
-       }
-
-       received = 0;
-
-       while (received < num_threads*num_jobs) {
-               int j;
-
-               ret = poll(pfds, num_pools, 1000);
-               if (ret == -1) {
-                       fprintf(stderr, "poll failed: %s\n",
-                               strerror(errno));
-                       return -1;
-               }
-               if (ret == 0) {
-                       fprintf(stderr, "\npoll timed out\n");
-                       break;
-               }
-
-               for (j=0; j<num_pools; j++) {
-                       int jobid = -1;
-
-                       if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
-                               continue;
-                       }
-
-                       ret = pthreadpool_pipe_finished_jobs(
-                               pools[j], &jobid, 1);
-                       if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
-                               fprintf(stderr, "invalid job number %d\n",
-                                       jobid);
-                               return -1;
-                       }
-                       finished[jobid] += 1;
-                       received += 1;
-               }
-       }
-
-       for (i=0; i<num_threads*num_jobs; i++) {
-               if (finished[i] != 1) {
-                       fprintf(stderr, "finished[%d] = %d\n",
-                               i, finished[i]);
-                       return -1;
-               }
-       }
-
-       for (i=0; i<num_pools; i++) {
-               ret = pthreadpool_pipe_destroy(pools[i]);
-               if (ret != 0) {
-                       fprintf(stderr, "pthreadpool_pipe_destroy failed: "
-                               "%s\n", strerror(ret));
-                       return -1;
-               }
-       }
-
-       free(pfds);
-       free(pools);
-       free(states);
-       free(finished);
-
-       return 0;
-}
-
 static int test_fork(void)
 {
        struct pthreadpool_pipe *p;
@@ -390,25 +217,6 @@ int main(void)
                return 1;
        }
 
-       /*
-        * Test 10 threads adding jobs on a single pool
-        */
-       ret = test_threaded_addjob(1, 10, 5, 5000);
-       if (ret != 0) {
-               fprintf(stderr, "test_jobs failed\n");
-               return 1;
-       }
-
-       /*
-        * Test 10 threads on 3 pools to verify our fork handling
-        * works right.
-        */
-       ret = test_threaded_addjob(3, 10, 5, 5000);
-       if (ret != 0) {
-               fprintf(stderr, "test_jobs failed\n");
-               return 1;
-       }
-
        printf("success\n");
        return 0;
 }