8 #include "pthreadpool.h"
10 static int test_init(void)
12 struct pthreadpool *p;
15 ret = pthreadpool_init(1, &p);
17 fprintf(stderr, "pthreadpool_init failed: %s\n",
21 ret = pthreadpool_destroy(p);
23 fprintf(stderr, "pthreadpool_init failed: %s\n",
30 static void test_sleep(void *ptr)
32 int *ptimeout = (int *)ptr;
34 ret = poll(NULL, 0, *ptimeout);
36 fprintf(stderr, "poll returned %d (%s)\n",
37 ret, strerror(errno));
41 static int test_jobs(int num_threads, int num_jobs)
44 struct pthreadpool *p;
48 finished = (char *)calloc(1, num_jobs);
49 if (finished == NULL) {
50 fprintf(stderr, "calloc failed\n");
54 ret = pthreadpool_init(num_threads, &p);
56 fprintf(stderr, "pthreadpool_init failed: %s\n",
61 for (i=0; i<num_jobs; i++) {
62 ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
64 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
70 for (i=0; i<num_jobs; i++) {
71 ret = pthreadpool_finished_job(p);
72 if ((ret < 0) || (ret >= num_jobs)) {
73 fprintf(stderr, "invalid job number %d\n", ret);
79 for (i=0; i<num_jobs; i++) {
80 if (finished[i] != 1) {
81 fprintf(stderr, "finished[%d] = %d\n",
87 ret = pthreadpool_destroy(p);
89 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
98 static int test_busydestroy(void)
100 struct pthreadpool *p;
105 ret = pthreadpool_init(1, &p);
107 fprintf(stderr, "pthreadpool_init failed: %s\n",
111 ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
113 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
117 ret = pthreadpool_destroy(p);
119 fprintf(stderr, "Could destroy a busy pool\n");
123 pfd.fd = pthreadpool_signal_fd(p);
124 pfd.events = POLLIN|POLLERR;
128 ret = pthreadpool_destroy(p);
130 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
137 struct threaded_state {
139 struct pthreadpool *p;
145 static void *test_threaded_worker(void *p)
147 struct threaded_state *state = (struct threaded_state *)p;
150 for (i=0; i<state->num_jobs; i++) {
151 int ret = pthreadpool_add_job(state->p, state->start_job + i,
152 test_sleep, &state->timeout);
154 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
162 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
165 struct pthreadpool **pools;
166 struct threaded_state *states;
167 struct threaded_state *state;
174 states = calloc(num_threads, sizeof(struct threaded_state));
175 if (states == NULL) {
176 fprintf(stderr, "calloc failed\n");
180 finished = calloc(num_threads * num_jobs, 1);
181 if (finished == NULL) {
182 fprintf(stderr, "calloc failed\n");
186 pools = calloc(num_pools, sizeof(struct pthreadpool *));
188 fprintf(stderr, "calloc failed\n");
192 pfds = calloc(num_pools, sizeof(struct pollfd));
194 fprintf(stderr, "calloc failed\n");
198 for (i=0; i<num_pools; i++) {
199 ret = pthreadpool_init(poolsize, &pools[i]);
201 fprintf(stderr, "pthreadpool_init failed: %s\n",
205 pfds[i].fd = pthreadpool_signal_fd(pools[i]);
206 pfds[i].events = POLLIN|POLLHUP;
211 for (i=0; i<num_threads; i++) {
214 state->p = pools[poolnum];
215 poolnum = (poolnum + 1) % num_pools;
217 state->num_jobs = num_jobs;
219 state->start_job = i * num_jobs;
221 ret = pthread_create(&state->tid, NULL, test_threaded_worker,
224 fprintf(stderr, "pthread_create failed: %s\n",
236 fprintf(stderr, "fork failed: %s\n", strerror(errno));
240 for (i=0; i<num_pools; i++) {
241 ret = pthreadpool_destroy(pools[i]);
243 fprintf(stderr, "pthreadpool_destroy failed: "
244 "%s\n", strerror(ret));
252 for (i=0; i<num_threads; i++) {
253 ret = pthread_join(states[i].tid, NULL);
255 fprintf(stderr, "pthread_join(%d) failed: %s\n",
263 while (received < num_threads*num_jobs) {
266 ret = poll(pfds, num_pools, 1000);
268 fprintf(stderr, "poll failed: %s\n",
273 fprintf(stderr, "\npoll timed out\n");
277 for (j=0; j<num_pools; j++) {
279 if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
283 ret = pthreadpool_finished_job(pools[j]);
284 if ((ret < 0) || (ret >= num_jobs * num_threads)) {
285 fprintf(stderr, "invalid job number %d\n",
294 for (i=0; i<num_threads*num_jobs; i++) {
295 if (finished[i] != 1) {
296 fprintf(stderr, "finished[%d] = %d\n",
302 for (i=0; i<num_pools; i++) {
303 ret = pthreadpool_destroy(pools[i]);
305 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
325 fprintf(stderr, "test_init failed\n");
329 ret = test_jobs(10, 10000);
331 fprintf(stderr, "test_jobs failed\n");
335 ret = test_busydestroy();
337 fprintf(stderr, "test_busydestroy failed\n");
342 * Test 10 threads adding jobs on a single pool
344 ret = test_threaded_addjob(1, 10, 5, 5000);
346 fprintf(stderr, "test_jobs failed\n");
351 * Test 10 threads on 3 pools to verify our fork handling
354 ret = test_threaded_addjob(3, 10, 5, 5000);
356 fprintf(stderr, "test_jobs failed\n");