#include <poll.h>
#include <errno.h>
#include <stdlib.h>
-//#include <pthread.h>
-#undef HAVE_PTHREAD
+#include <pthread.h>
#include <unistd.h>
#include <sys/types.h>
#include <sys/wait.h>
return 0;
}
-#ifdef HAVE_PTHREAD
static int test_busydestroy(void)
{
struct pthreadpool_pipe *p;
return 0;
}
-struct threaded_state {
- pthread_t tid;
- struct pthreadpool_pipe *p;
- int start_job;
- int num_jobs;
- int timeout;
-};
-
-static void *test_threaded_worker(void *p)
-{
- struct threaded_state *state = (struct threaded_state *)p;
- int i;
-
- for (i=0; i<state->num_jobs; i++) {
- int ret = pthreadpool_pipe_add_job(
- state->p, state->start_job + i,
- test_sleep, &state->timeout);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_add_job failed: "
- "%s\n", strerror(ret));
- return NULL;
- }
- }
- return NULL;
-}
-
-static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
- int num_jobs)
-{
- struct pthreadpool_pipe **pools;
- struct threaded_state *states;
- struct threaded_state *state;
- struct pollfd *pfds;
- char *finished;
- pid_t child;
- int i, ret, poolnum;
- int received;
-
- states = calloc(num_threads, sizeof(struct threaded_state));
- if (states == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- finished = calloc(num_threads * num_jobs, 1);
- if (finished == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- pools = calloc(num_pools, sizeof(struct pthreadpool_pipe *));
- if (pools == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- pfds = calloc(num_pools, sizeof(struct pollfd));
- if (pfds == NULL) {
- fprintf(stderr, "calloc failed\n");
- return -1;
- }
-
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_init(poolsize, &pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_init failed: %s\n",
- strerror(ret));
- return -1;
- }
- pfds[i].fd = pthreadpool_pipe_signal_fd(pools[i]);
- pfds[i].events = POLLIN|POLLHUP;
- }
-
- poolnum = 0;
-
- for (i=0; i<num_threads; i++) {
- state = &states[i];
-
- state->p = pools[poolnum];
- poolnum = (poolnum + 1) % num_pools;
-
- state->num_jobs = num_jobs;
- state->timeout = 1;
- state->start_job = i * num_jobs;
-
- ret = pthread_create(&state->tid, NULL, test_threaded_worker,
- state);
- if (ret != 0) {
- fprintf(stderr, "pthread_create failed: %s\n",
- strerror(ret));
- return -1;
- }
- }
-
- if (random() % 1) {
- poll(NULL, 0, 1);
- }
-
- child = fork();
- if (child < 0) {
- fprintf(stderr, "fork failed: %s\n", strerror(errno));
- return -1;
- }
- if (child == 0) {
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_destroy(pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_destroy "
- "failed: %s\n", strerror(ret));
- exit(1);
- }
- }
- /* child */
- exit(0);
- }
-
- for (i=0; i<num_threads; i++) {
- ret = pthread_join(states[i].tid, NULL);
- if (ret != 0) {
- fprintf(stderr, "pthread_join(%d) failed: %s\n",
- i, strerror(ret));
- return -1;
- }
- }
-
- received = 0;
-
- while (received < num_threads*num_jobs) {
- int j;
-
- ret = poll(pfds, num_pools, 1000);
- if (ret == -1) {
- fprintf(stderr, "poll failed: %s\n",
- strerror(errno));
- return -1;
- }
- if (ret == 0) {
- fprintf(stderr, "\npoll timed out\n");
- break;
- }
-
- for (j=0; j<num_pools; j++) {
- int jobid = -1;
-
- if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
- continue;
- }
-
- ret = pthreadpool_pipe_finished_jobs(
- pools[j], &jobid, 1);
- if ((ret != 1) || (jobid >= num_jobs * num_threads)) {
- fprintf(stderr, "invalid job number %d\n",
- jobid);
- return -1;
- }
- finished[jobid] += 1;
- received += 1;
- }
- }
-
- for (i=0; i<num_threads*num_jobs; i++) {
- if (finished[i] != 1) {
- fprintf(stderr, "finished[%d] = %d\n",
- i, finished[i]);
- return -1;
- }
- }
-
- for (i=0; i<num_pools; i++) {
- ret = pthreadpool_pipe_destroy(pools[i]);
- if (ret != 0) {
- fprintf(stderr, "pthreadpool_pipe_destroy failed: "
- "%s\n", strerror(ret));
- return -1;
- }
- }
-
- free(pfds);
- free(pools);
- free(states);
- free(finished);
-
- return 0;
-}
-#endif /* HAVE_PTHREAD */
-
static int test_fork(void)
{
struct pthreadpool_pipe *p;
return 1;
}
-#ifdef HAVE_PTHREAD
ret = test_busydestroy();
if (ret != 0) {
fprintf(stderr, "test_busydestroy failed\n");
return 1;
}
- /*
- * Test 10 threads adding jobs on a single pool
- */
- ret = test_threaded_addjob(1, 10, 5, 5000);
- if (ret != 0) {
- fprintf(stderr, "test_jobs failed\n");
- return 1;
- }
-
- /*
- * Test 10 threads on 3 pools to verify our fork handling
- * works right.
- */
- ret = test_threaded_addjob(3, 10, 5, 5000);
- if (ret != 0) {
- fprintf(stderr, "test_jobs failed\n");
- return 1;
- }
-#endif
-
printf("success\n");
return 0;
}