8 #include "pthreadpool.h"
10 static int test_init(void)
12 struct pthreadpool *p;
15 ret = pthreadpool_init(1, &p);
17 fprintf(stderr, "pthreadpool_init failed: %s\n",
21 ret = pthreadpool_destroy(p);
23 fprintf(stderr, "pthreadpool_init failed: %s\n",
30 static void test_sleep(void *ptr)
32 int *ptimeout = (int *)ptr;
34 ret = poll(NULL, 0, *ptimeout);
36 fprintf(stderr, "poll returned %d (%s)\n",
37 ret, strerror(errno));
41 static int test_jobs(int num_threads, int num_jobs)
44 struct pthreadpool *p;
48 finished = (char *)calloc(1, num_jobs);
49 if (finished == NULL) {
50 fprintf(stderr, "calloc failed\n");
54 ret = pthreadpool_init(num_threads, &p);
56 fprintf(stderr, "pthreadpool_init failed: %s\n",
61 for (i=0; i<num_jobs; i++) {
62 ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
64 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
70 for (i=0; i<num_jobs; i++) {
72 ret = pthreadpool_finished_job(p, &jobid);
73 if ((ret != 0) || (jobid >= num_jobs)) {
74 fprintf(stderr, "invalid job number %d\n", jobid);
80 for (i=0; i<num_jobs; i++) {
81 if (finished[i] != 1) {
82 fprintf(stderr, "finished[%d] = %d\n",
88 ret = pthreadpool_destroy(p);
90 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
99 static int test_busydestroy(void)
101 struct pthreadpool *p;
106 ret = pthreadpool_init(1, &p);
108 fprintf(stderr, "pthreadpool_init failed: %s\n",
112 ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
114 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
118 ret = pthreadpool_destroy(p);
120 fprintf(stderr, "Could destroy a busy pool\n");
124 pfd.fd = pthreadpool_signal_fd(p);
125 pfd.events = POLLIN|POLLERR;
129 ret = pthreadpool_destroy(p);
131 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
138 struct threaded_state {
140 struct pthreadpool *p;
146 static void *test_threaded_worker(void *p)
148 struct threaded_state *state = (struct threaded_state *)p;
151 for (i=0; i<state->num_jobs; i++) {
152 int ret = pthreadpool_add_job(state->p, state->start_job + i,
153 test_sleep, &state->timeout);
155 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
163 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
166 struct pthreadpool **pools;
167 struct threaded_state *states;
168 struct threaded_state *state;
175 states = calloc(num_threads, sizeof(struct threaded_state));
176 if (states == NULL) {
177 fprintf(stderr, "calloc failed\n");
181 finished = calloc(num_threads * num_jobs, 1);
182 if (finished == NULL) {
183 fprintf(stderr, "calloc failed\n");
187 pools = calloc(num_pools, sizeof(struct pthreadpool *));
189 fprintf(stderr, "calloc failed\n");
193 pfds = calloc(num_pools, sizeof(struct pollfd));
195 fprintf(stderr, "calloc failed\n");
199 for (i=0; i<num_pools; i++) {
200 ret = pthreadpool_init(poolsize, &pools[i]);
202 fprintf(stderr, "pthreadpool_init failed: %s\n",
206 pfds[i].fd = pthreadpool_signal_fd(pools[i]);
207 pfds[i].events = POLLIN|POLLHUP;
212 for (i=0; i<num_threads; i++) {
215 state->p = pools[poolnum];
216 poolnum = (poolnum + 1) % num_pools;
218 state->num_jobs = num_jobs;
220 state->start_job = i * num_jobs;
222 ret = pthread_create(&state->tid, NULL, test_threaded_worker,
225 fprintf(stderr, "pthread_create failed: %s\n",
237 fprintf(stderr, "fork failed: %s\n", strerror(errno));
241 for (i=0; i<num_pools; i++) {
242 ret = pthreadpool_destroy(pools[i]);
244 fprintf(stderr, "pthreadpool_destroy failed: "
245 "%s\n", strerror(ret));
253 for (i=0; i<num_threads; i++) {
254 ret = pthread_join(states[i].tid, NULL);
256 fprintf(stderr, "pthread_join(%d) failed: %s\n",
264 while (received < num_threads*num_jobs) {
267 ret = poll(pfds, num_pools, 1000);
269 fprintf(stderr, "poll failed: %s\n",
274 fprintf(stderr, "\npoll timed out\n");
278 for (j=0; j<num_pools; j++) {
281 if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
285 ret = pthreadpool_finished_job(pools[j], &jobid);
286 if ((ret != 0) || (jobid >= num_jobs * num_threads)) {
287 fprintf(stderr, "invalid job number %d\n",
291 finished[jobid] += 1;
296 for (i=0; i<num_threads*num_jobs; i++) {
297 if (finished[i] != 1) {
298 fprintf(stderr, "finished[%d] = %d\n",
304 for (i=0; i<num_pools; i++) {
305 ret = pthreadpool_destroy(pools[i]);
307 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
327 fprintf(stderr, "test_init failed\n");
331 ret = test_jobs(10, 10000);
333 fprintf(stderr, "test_jobs failed\n");
337 ret = test_busydestroy();
339 fprintf(stderr, "test_busydestroy failed\n");
344 * Test 10 threads adding jobs on a single pool
346 ret = test_threaded_addjob(1, 10, 5, 5000);
348 fprintf(stderr, "test_jobs failed\n");
353 * Test 10 threads on 3 pools to verify our fork handling
356 ret = test_threaded_addjob(3, 10, 5, 5000);
358 fprintf(stderr, "test_jobs failed\n");