HACK force pthreadpool_sync.c
authorStefan Metzmacher <metze@samba.org>
Wed, 17 Aug 2016 12:32:34 +0000 (14:32 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:52:28 +0000 (09:52 +0200)
lib/pthreadpool/tests.c
lib/pthreadpool/wscript_build

index f0ae0aa4a93b1ec2ffe9df85ef768b3c8a968463..150b81e02152b5f94acf796629b8dc0aace67dce 100644 (file)
@@ -3,7 +3,8 @@
 #include <poll.h>
 #include <errno.h>
 #include <stdlib.h>
-#include <pthread.h>
+//#include <pthread.h>
+#undef HAVE_PTHREAD
 #include <unistd.h>
 #include <sys/types.h>
 #include <sys/wait.h>
@@ -105,6 +106,7 @@ static int test_jobs(int num_threads, int num_jobs)
        return 0;
 }
 
+#ifdef HAVE_PTHREAD
 static int test_busydestroy(void)
 {
        struct pthreadpool_pipe *p;
@@ -153,6 +155,192 @@ static int test_busydestroy(void)
        return 0;
 }
 
+struct threaded_state {
+       pthread_t tid;
+       struct pthreadpool_pipe *p;
+       int start_job;
+       int num_jobs;
+       int timeout;
+};
+
+static void *test_threaded_worker(void *p)
+{
+       struct threaded_state *state = (struct threaded_state *)p;
+       int i;
+
+       for (i=0; i<state->num_jobs; i++) {
+               int ret = pthreadpool_pipe_add_job(
+                       state->p, state->start_job + i,
+                       test_sleep, &state->timeout);
+               if (ret != 0) {
+                       fprintf(stderr, "pthreadpool_pipe_add_job failed: "
+                               "%s\n", strerror(ret));
+                       return NULL;
+               }
+       }
+       return NULL;
+}
+
+static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
+                               int num_jobs)
+{
+       struct pthreadpool_pipe **pools;
+       struct threaded_state *states;
+       struct threaded_state *state;
+       struct pollfd *pfds;
+       char *finished;
+       pid_t child;
+       int i, ret, poolnum;
+       int received;
+
+       states = calloc(num_threads, sizeof(struct threaded_state));
+       if (states == NULL) {
+               fprintf(stderr, "calloc failed\n");
+               return -1;
+       }
+
+       finished = calloc(num_threads * num_jobs, 1);
+       if (finished == NULL) {
+               fprintf(stderr, "calloc failed\n");
+               return -1;
+       }
+
+       pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
+       if (pools == NULL) {
+               fprintf(stderr, "calloc failed\n");
+               return -1;
+       }
+
+       pfds = calloc(num_pools, sizeof(struct pollfd));
+       if (pfds == NULL) {
+               fprintf(stderr, "calloc failed\n");
+               return -1;
+       }
+
+       for (i=0; i<num_pools; i++) {
+               ret = pthreadpool_pipe_init(poolsize, &pools[i]);
+               if (ret != 0) {
+                       fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
+                               strerror(ret));
+                       return -1;
+               }
+               pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
+               pfds[i].events = POLLIN|POLLHUP;
+       }
+
+       poolnum = 0;
+
+       for (i=0; i<num_threads; i++) {
+               state = &states[i];
+
+               state->p = pools[poolnum];
+               poolnum = (poolnum + 1) % num_pools;
+
+               state->num_jobs = num_jobs;
+               state->timeout = 1;
+               state->start_job = i * num_jobs;
+
+               ret = pthread_create(&state->tid, NULL, test_threaded_worker,
+                                    state);
+               if (ret != 0) {
+                       fprintf(stderr, "pthread_create failed: %s\n",
+                               strerror(ret));
+                       return -1;
+               }
+       }
+
+       if (random() % 1) {
+               poll(NULL, 0, 1);
+       }
+
+       child = fork();
+       if (child < 0) {
+               fprintf(stderr, "fork failed: %s\n", strerror(errno));
+               return -1;
+       }
+       if (child == 0) {
+               for (i=0; i<num_pools; i++) {
+                       ret = pthreadpool_pipe_destroy(pools[i]);
+                       if (ret != 0) {
+                               fprintf(stderr, "pthreadpool_pipe_destroy "
+                                       "failed: %s\n", strerror(ret));
+                               exit(1);
+                       }
+               }
+               /* child */
+               exit(0);
+       }
+
+       for (i=0; i<num_threads; i++) {
+               ret = pthread_join(states[i].tid, NULL);
+               if (ret != 0) {
+                       fprintf(stderr, "pthread_join(%d) failed: %s\n",
+                               i, strerror(ret));
+                       return -1;
+               }
+       }
+
+       received = 0;
+
+       while (received < num_threads*num_jobs) {
+               int j;
+
+               ret = poll(pfds, num_pools, 1000);
+               if (ret == -1) {
+                       fprintf(stderr, "poll failed: %s\n",
+                               strerror(errno));
+                       return -1;
+               }
+               if (ret == 0) {
+                       fprintf(stderr, "\npoll timed out\n");
+                       break;
+               }
+
+               for (j=0; j<num_pools; j++) {
+                       int jobid = -1;
+
+                       if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
+                               continue;
+                       }
+
+                       ret = pthreadpool_pipe_finished_jobs(
+                               pools[j], &jobid, 1);
+                       if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
+                               fprintf(stderr, "invalid job number %d\n",
+                                       jobid);
+                               return -1;
+                       }
+                       finished[jobid] += 1;
+                       received += 1;
+               }
+       }
+
+       for (i=0; i<num_threads*num_jobs; i++) {
+               if (finished[i] != 1) {
+                       fprintf(stderr, "finished[%d] = %d\n",
+                               i, finished[i]);
+                       return -1;
+               }
+       }
+
+       for (i=0; i<num_pools; i++) {
+               ret = pthreadpool_pipe_destroy(pools[i]);
+               if (ret != 0) {
+                       fprintf(stderr, "pthreadpool_pipe_destroy failed: "
+                               "%s\n", strerror(ret));
+                       return -1;
+               }
+       }
+
+       free(pfds);
+       free(pools);
+       free(states);
+       free(finished);
+
+       return 0;
+}
+#endif /* HAVE_PTHREAD */
+
 static int test_fork(void)
 {
        struct pthreadpool_pipe *p;
@@ -487,6 +675,7 @@ int main(void)
                return 1;
        }
 
+#ifdef HAVE_PTHREAD
        ret = test_busydestroy();
        if (ret != 0) {
                fprintf(stderr, "test_busydestroy failed\n");
@@ -505,6 +694,26 @@ int main(void)
                return 1;
        }
 
+       /*
+        * Test 10 threads adding jobs on a single pool
+        */
+       ret = test_threaded_addjob(1, 10, 5, 5000);
+       if (ret != 0) {
+               fprintf(stderr, "test_jobs failed\n");
+               return 1;
+       }
+
+       /*
+        * Test 10 threads on 3 pools to verify our fork handling
+        * works right.
+        */
+       ret = test_threaded_addjob(3, 10, 5, 5000);
+       if (ret != 0) {
+               fprintf(stderr, "test_jobs failed\n");
+               return 1;
+       }
+#endif
+
        printf("success\n");
        return 0;
 }
index 57df25548b12acceed8b16aad8fb15ade8873536..a9a1baa6b1a71f11194468471dbc3c9d01f7fa47 100644 (file)
@@ -1,6 +1,6 @@
 #!/usr/bin/env python
 
-if bld.env.WITH_PTHREADPOOL:
+if False and bld.env.WITH_PTHREADPOOL:
     bld.SAMBA_SUBSYSTEM('PTHREADPOOL',
                          source='''pthreadpool.c
                                    pthreadpool_pipe.c