667ee01784e74feb29e304ef66d90e8cbef5113d
[mat/samba.git] / source3 / lib / pthreadpool / tests.c
1 #include <stdio.h>
2 #include <string.h>
3 #include <poll.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <unistd.h>
8 #include "pthreadpool.h"
9
10 static int test_init(void)
11 {
12         struct pthreadpool *p;
13         int ret;
14
15         ret = pthreadpool_init(1, &p);
16         if (ret != 0) {
17                 fprintf(stderr, "pthreadpool_init failed: %s\n",
18                         strerror(ret));
19                 return -1;
20         }
21         ret = pthreadpool_destroy(p);
22         if (ret != 0) {
23                 fprintf(stderr, "pthreadpool_init failed: %s\n",
24                         strerror(ret));
25                 return -1;
26         }
27         return 0;
28 }
29
30 static void test_sleep(void *ptr)
31 {
32         int *ptimeout = (int *)ptr;
33         int ret;
34         ret = poll(NULL, 0, *ptimeout);
35         if (ret != 0) {
36                 fprintf(stderr, "poll returned %d (%s)\n",
37                         ret, strerror(errno));
38         }
39 }
40
41 static int test_jobs(int num_threads, int num_jobs)
42 {
43         char *finished;
44         struct pthreadpool *p;
45         int timeout = 1;
46         int i, ret;
47
48         finished = (char *)calloc(1, num_jobs);
49         if (finished == NULL) {
50                 fprintf(stderr, "calloc failed\n");
51                 return -1;
52         }
53
54         ret = pthreadpool_init(num_threads, &p);
55         if (ret != 0) {
56                 fprintf(stderr, "pthreadpool_init failed: %s\n",
57                         strerror(ret));
58                 return -1;
59         }
60
61         for (i=0; i<num_jobs; i++) {
62                 ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
63                 if (ret != 0) {
64                         fprintf(stderr, "pthreadpool_add_job failed: %s\n",
65                                 strerror(ret));
66                         return -1;
67                 }
68         }
69
70         for (i=0; i<num_jobs; i++) {
71                 ret = pthreadpool_finished_job(p);
72                 if ((ret < 0) || (ret >= num_jobs)) {
73                         fprintf(stderr, "invalid job number %d\n", ret);
74                         return -1;
75                 }
76                 finished[ret] += 1;
77         }
78
79         for (i=0; i<num_jobs; i++) {
80                 if (finished[i] != 1) {
81                         fprintf(stderr, "finished[%d] = %d\n",
82                                 i, finished[i]);
83                         return -1;
84                 }
85         }
86
87         ret = pthreadpool_destroy(p);
88         if (ret != 0) {
89                 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
90                         strerror(ret));
91                 return -1;
92         }
93
94         free(finished);
95         return 0;
96 }
97
98 static int test_busydestroy(void)
99 {
100         struct pthreadpool *p;
101         int timeout = 50;
102         struct pollfd pfd;
103         int ret;
104
105         ret = pthreadpool_init(1, &p);
106         if (ret != 0) {
107                 fprintf(stderr, "pthreadpool_init failed: %s\n",
108                         strerror(ret));
109                 return -1;
110         }
111         ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
112         if (ret != 0) {
113                 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
114                         strerror(ret));
115                 return -1;
116         }
117         ret = pthreadpool_destroy(p);
118         if (ret != EBUSY) {
119                 fprintf(stderr, "Could destroy a busy pool\n");
120                 return -1;
121         }
122
123         pfd.fd = pthreadpool_signal_fd(p);
124         pfd.events = POLLIN|POLLERR;
125
126         poll(&pfd, 1, -1);
127
128         ret = pthreadpool_destroy(p);
129         if (ret != 0) {
130                 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
131                         strerror(ret));
132                 return -1;
133         }
134         return 0;
135 }
136
137 struct threaded_state {
138         pthread_t tid;
139         struct pthreadpool *p;
140         int start_job;
141         int num_jobs;
142         int timeout;
143 };
144
145 static void *test_threaded_worker(void *p)
146 {
147         struct threaded_state *state = (struct threaded_state *)p;
148         int i;
149
150         for (i=0; i<state->num_jobs; i++) {
151                 int ret = pthreadpool_add_job(state->p, state->start_job + i,
152                                               test_sleep, &state->timeout);
153                 if (ret != 0) {
154                         fprintf(stderr, "pthreadpool_add_job failed: %s\n",
155                                 strerror(ret));
156                         return NULL;
157                 }
158         }
159         return NULL;
160 }
161
162 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
163                                 int num_jobs)
164 {
165         struct pthreadpool **pools;
166         struct threaded_state *states;
167         struct threaded_state *state;
168         struct pollfd *pfds;
169         char *finished;
170         pid_t child;
171         int i, ret, poolnum;
172         int received;
173
174         states = calloc(num_threads, sizeof(struct threaded_state));
175         if (states == NULL) {
176                 fprintf(stderr, "calloc failed\n");
177                 return -1;
178         }
179
180         finished = calloc(num_threads * num_jobs, 1);
181         if (finished == NULL) {
182                 fprintf(stderr, "calloc failed\n");
183                 return -1;
184         }
185
186         pools = calloc(num_pools, sizeof(struct pthreadpool *));
187         if (pools == NULL) {
188                 fprintf(stderr, "calloc failed\n");
189                 return -1;
190         }
191
192         pfds = calloc(num_pools, sizeof(struct pollfd));
193         if (pfds == NULL) {
194                 fprintf(stderr, "calloc failed\n");
195                 return -1;
196         }
197
198         for (i=0; i<num_pools; i++) {
199                 ret = pthreadpool_init(poolsize, &pools[i]);
200                 if (ret != 0) {
201                         fprintf(stderr, "pthreadpool_init failed: %s\n",
202                                 strerror(ret));
203                         return -1;
204                 }
205                 pfds[i].fd = pthreadpool_signal_fd(pools[i]);
206                 pfds[i].events = POLLIN|POLLHUP;
207         }
208
209         poolnum = 0;
210
211         for (i=0; i<num_threads; i++) {
212                 state = &states[i];
213
214                 state->p = pools[poolnum];
215                 poolnum = (poolnum + 1) % num_pools;
216
217                 state->num_jobs = num_jobs;
218                 state->timeout = 1;
219                 state->start_job = i * num_jobs;
220
221                 ret = pthread_create(&state->tid, NULL, test_threaded_worker,
222                                      state);
223                 if (ret != 0) {
224                         fprintf(stderr, "pthread_create failed: %s\n",
225                                 strerror(ret));
226                         return -1;
227                 }
228         }
229
230         if (random() % 1) {
231                 poll(NULL, 0, 1);
232         }
233
234         child = fork();
235         if (child < 0) {
236                 fprintf(stderr, "fork failed: %s\n", strerror(errno));
237                 return -1;
238         }
239         if (child == 0) {
240                 for (i=0; i<num_pools; i++) {
241                         ret = pthreadpool_destroy(pools[i]);
242                         if (ret != 0) {
243                                 fprintf(stderr, "pthreadpool_destroy failed: "
244                                         "%s\n", strerror(ret));
245                                 exit(1);
246                         }
247                 }
248                 /* child */
249                 exit(0);
250         }
251
252         for (i=0; i<num_threads; i++) {
253                 ret = pthread_join(states[i].tid, NULL);
254                 if (ret != 0) {
255                         fprintf(stderr, "pthread_join(%d) failed: %s\n",
256                                 i, strerror(ret));
257                         return -1;
258                 }
259         }
260
261         received = 0;
262
263         while (received < num_threads*num_jobs) {
264                 int j;
265
266                 ret = poll(pfds, num_pools, 1000);
267                 if (ret == -1) {
268                         fprintf(stderr, "poll failed: %s\n",
269                                 strerror(errno));
270                         return -1;
271                 }
272                 if (ret == 0) {
273                         fprintf(stderr, "\npoll timed out\n");
274                         break;
275                 }
276
277                 for (j=0; j<num_pools; j++) {
278
279                         if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
280                                 continue;
281                         }
282
283                         ret = pthreadpool_finished_job(pools[j]);
284                         if ((ret < 0) || (ret >= num_jobs * num_threads)) {
285                                 fprintf(stderr, "invalid job number %d\n",
286                                         ret);
287                                 return -1;
288                         }
289                         finished[ret] += 1;
290                         received += 1;
291                 }
292         }
293
294         for (i=0; i<num_threads*num_jobs; i++) {
295                 if (finished[i] != 1) {
296                         fprintf(stderr, "finished[%d] = %d\n",
297                                 i, finished[i]);
298                         return -1;
299                 }
300         }
301
302         for (i=0; i<num_pools; i++) {
303                 ret = pthreadpool_destroy(pools[i]);
304                 if (ret != 0) {
305                         fprintf(stderr, "pthreadpool_destroy failed: %s\n",
306                                 strerror(ret));
307                         return -1;
308                 }
309         }
310
311         free(pfds);
312         free(pools);
313         free(states);
314         free(finished);
315
316         return 0;
317 }
318
319 int main(void)
320 {
321         int ret;
322
323         ret = test_init();
324         if (ret != 0) {
325                 fprintf(stderr, "test_init failed\n");
326                 return 1;
327         }
328
329         ret = test_jobs(10, 10000);
330         if (ret != 0) {
331                 fprintf(stderr, "test_jobs failed\n");
332                 return 1;
333         }
334
335         ret = test_busydestroy();
336         if (ret != 0) {
337                 fprintf(stderr, "test_busydestroy failed\n");
338                 return 1;
339         }
340
341         /*
342          * Test 10 threads adding jobs on a single pool
343          */
344         ret = test_threaded_addjob(1, 10, 5, 5000);
345         if (ret != 0) {
346                 fprintf(stderr, "test_jobs failed\n");
347                 return 1;
348         }
349
350         /*
351          * Test 10 threads on 3 pools to verify our fork handling
352          * works right.
353          */
354         ret = test_threaded_addjob(3, 10, 5, 5000);
355         if (ret != 0) {
356                 fprintf(stderr, "test_jobs failed\n");
357                 return 1;
358         }
359
360         printf("success\n");
361         return 0;
362 }