Change the signature of pthreadpool_finished_job() to return 0
[mat/samba.git] / source3 / lib / pthreadpool / tests.c
1 #include <stdio.h>
2 #include <string.h>
3 #include <poll.h>
4 #include <errno.h>
5 #include <stdlib.h>
6 #include <pthread.h>
7 #include <unistd.h>
8 #include "pthreadpool.h"
9
10 static int test_init(void)
11 {
12         struct pthreadpool *p;
13         int ret;
14
15         ret = pthreadpool_init(1, &p);
16         if (ret != 0) {
17                 fprintf(stderr, "pthreadpool_init failed: %s\n",
18                         strerror(ret));
19                 return -1;
20         }
21         ret = pthreadpool_destroy(p);
22         if (ret != 0) {
23                 fprintf(stderr, "pthreadpool_init failed: %s\n",
24                         strerror(ret));
25                 return -1;
26         }
27         return 0;
28 }
29
30 static void test_sleep(void *ptr)
31 {
32         int *ptimeout = (int *)ptr;
33         int ret;
34         ret = poll(NULL, 0, *ptimeout);
35         if (ret != 0) {
36                 fprintf(stderr, "poll returned %d (%s)\n",
37                         ret, strerror(errno));
38         }
39 }
40
41 static int test_jobs(int num_threads, int num_jobs)
42 {
43         char *finished;
44         struct pthreadpool *p;
45         int timeout = 1;
46         int i, ret;
47
48         finished = (char *)calloc(1, num_jobs);
49         if (finished == NULL) {
50                 fprintf(stderr, "calloc failed\n");
51                 return -1;
52         }
53
54         ret = pthreadpool_init(num_threads, &p);
55         if (ret != 0) {
56                 fprintf(stderr, "pthreadpool_init failed: %s\n",
57                         strerror(ret));
58                 return -1;
59         }
60
61         for (i=0; i<num_jobs; i++) {
62                 ret = pthreadpool_add_job(p, i, test_sleep, &timeout);
63                 if (ret != 0) {
64                         fprintf(stderr, "pthreadpool_add_job failed: %s\n",
65                                 strerror(ret));
66                         return -1;
67                 }
68         }
69
70         for (i=0; i<num_jobs; i++) {
71                 int jobid = -1;
72                 ret = pthreadpool_finished_job(p, &jobid);
73                 if ((ret != 0) || (jobid >= num_jobs)) {
74                         fprintf(stderr, "invalid job number %d\n", jobid);
75                         return -1;
76                 }
77                 finished[jobid] += 1;
78         }
79
80         for (i=0; i<num_jobs; i++) {
81                 if (finished[i] != 1) {
82                         fprintf(stderr, "finished[%d] = %d\n",
83                                 i, finished[i]);
84                         return -1;
85                 }
86         }
87
88         ret = pthreadpool_destroy(p);
89         if (ret != 0) {
90                 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
91                         strerror(ret));
92                 return -1;
93         }
94
95         free(finished);
96         return 0;
97 }
98
99 static int test_busydestroy(void)
100 {
101         struct pthreadpool *p;
102         int timeout = 50;
103         struct pollfd pfd;
104         int ret;
105
106         ret = pthreadpool_init(1, &p);
107         if (ret != 0) {
108                 fprintf(stderr, "pthreadpool_init failed: %s\n",
109                         strerror(ret));
110                 return -1;
111         }
112         ret = pthreadpool_add_job(p, 1, test_sleep, &timeout);
113         if (ret != 0) {
114                 fprintf(stderr, "pthreadpool_add_job failed: %s\n",
115                         strerror(ret));
116                 return -1;
117         }
118         ret = pthreadpool_destroy(p);
119         if (ret != EBUSY) {
120                 fprintf(stderr, "Could destroy a busy pool\n");
121                 return -1;
122         }
123
124         pfd.fd = pthreadpool_signal_fd(p);
125         pfd.events = POLLIN|POLLERR;
126
127         poll(&pfd, 1, -1);
128
129         ret = pthreadpool_destroy(p);
130         if (ret != 0) {
131                 fprintf(stderr, "pthreadpool_destroy failed: %s\n",
132                         strerror(ret));
133                 return -1;
134         }
135         return 0;
136 }
137
138 struct threaded_state {
139         pthread_t tid;
140         struct pthreadpool *p;
141         int start_job;
142         int num_jobs;
143         int timeout;
144 };
145
146 static void *test_threaded_worker(void *p)
147 {
148         struct threaded_state *state = (struct threaded_state *)p;
149         int i;
150
151         for (i=0; i<state->num_jobs; i++) {
152                 int ret = pthreadpool_add_job(state->p, state->start_job + i,
153                                               test_sleep, &state->timeout);
154                 if (ret != 0) {
155                         fprintf(stderr, "pthreadpool_add_job failed: %s\n",
156                                 strerror(ret));
157                         return NULL;
158                 }
159         }
160         return NULL;
161 }
162
163 static int test_threaded_addjob(int num_pools, int num_threads, int poolsize,
164                                 int num_jobs)
165 {
166         struct pthreadpool **pools;
167         struct threaded_state *states;
168         struct threaded_state *state;
169         struct pollfd *pfds;
170         char *finished;
171         pid_t child;
172         int i, ret, poolnum;
173         int received;
174
175         states = calloc(num_threads, sizeof(struct threaded_state));
176         if (states == NULL) {
177                 fprintf(stderr, "calloc failed\n");
178                 return -1;
179         }
180
181         finished = calloc(num_threads * num_jobs, 1);
182         if (finished == NULL) {
183                 fprintf(stderr, "calloc failed\n");
184                 return -1;
185         }
186
187         pools = calloc(num_pools, sizeof(struct pthreadpool *));
188         if (pools == NULL) {
189                 fprintf(stderr, "calloc failed\n");
190                 return -1;
191         }
192
193         pfds = calloc(num_pools, sizeof(struct pollfd));
194         if (pfds == NULL) {
195                 fprintf(stderr, "calloc failed\n");
196                 return -1;
197         }
198
199         for (i=0; i<num_pools; i++) {
200                 ret = pthreadpool_init(poolsize, &pools[i]);
201                 if (ret != 0) {
202                         fprintf(stderr, "pthreadpool_init failed: %s\n",
203                                 strerror(ret));
204                         return -1;
205                 }
206                 pfds[i].fd = pthreadpool_signal_fd(pools[i]);
207                 pfds[i].events = POLLIN|POLLHUP;
208         }
209
210         poolnum = 0;
211
212         for (i=0; i<num_threads; i++) {
213                 state = &states[i];
214
215                 state->p = pools[poolnum];
216                 poolnum = (poolnum + 1) % num_pools;
217
218                 state->num_jobs = num_jobs;
219                 state->timeout = 1;
220                 state->start_job = i * num_jobs;
221
222                 ret = pthread_create(&state->tid, NULL, test_threaded_worker,
223                                      state);
224                 if (ret != 0) {
225                         fprintf(stderr, "pthread_create failed: %s\n",
226                                 strerror(ret));
227                         return -1;
228                 }
229         }
230
231         if (random() % 1) {
232                 poll(NULL, 0, 1);
233         }
234
235         child = fork();
236         if (child < 0) {
237                 fprintf(stderr, "fork failed: %s\n", strerror(errno));
238                 return -1;
239         }
240         if (child == 0) {
241                 for (i=0; i<num_pools; i++) {
242                         ret = pthreadpool_destroy(pools[i]);
243                         if (ret != 0) {
244                                 fprintf(stderr, "pthreadpool_destroy failed: "
245                                         "%s\n", strerror(ret));
246                                 exit(1);
247                         }
248                 }
249                 /* child */
250                 exit(0);
251         }
252
253         for (i=0; i<num_threads; i++) {
254                 ret = pthread_join(states[i].tid, NULL);
255                 if (ret != 0) {
256                         fprintf(stderr, "pthread_join(%d) failed: %s\n",
257                                 i, strerror(ret));
258                         return -1;
259                 }
260         }
261
262         received = 0;
263
264         while (received < num_threads*num_jobs) {
265                 int j;
266
267                 ret = poll(pfds, num_pools, 1000);
268                 if (ret == -1) {
269                         fprintf(stderr, "poll failed: %s\n",
270                                 strerror(errno));
271                         return -1;
272                 }
273                 if (ret == 0) {
274                         fprintf(stderr, "\npoll timed out\n");
275                         break;
276                 }
277
278                 for (j=0; j<num_pools; j++) {
279                         int jobid = -1;
280
281                         if ((pfds[j].revents & (POLLIN|POLLHUP)) == 0) {
282                                 continue;
283                         }
284
285                         ret = pthreadpool_finished_job(pools[j], &jobid);
286                         if ((ret != 0) || (jobid >= num_jobs * num_threads)) {
287                                 fprintf(stderr, "invalid job number %d\n",
288                                         jobid);
289                                 return -1;
290                         }
291                         finished[jobid] += 1;
292                         received += 1;
293                 }
294         }
295
296         for (i=0; i<num_threads*num_jobs; i++) {
297                 if (finished[i] != 1) {
298                         fprintf(stderr, "finished[%d] = %d\n",
299                                 i, finished[i]);
300                         return -1;
301                 }
302         }
303
304         for (i=0; i<num_pools; i++) {
305                 ret = pthreadpool_destroy(pools[i]);
306                 if (ret != 0) {
307                         fprintf(stderr, "pthreadpool_destroy failed: %s\n",
308                                 strerror(ret));
309                         return -1;
310                 }
311         }
312
313         free(pfds);
314         free(pools);
315         free(states);
316         free(finished);
317
318         return 0;
319 }
320
321 int main(void)
322 {
323         int ret;
324
325         ret = test_init();
326         if (ret != 0) {
327                 fprintf(stderr, "test_init failed\n");
328                 return 1;
329         }
330
331         ret = test_jobs(10, 10000);
332         if (ret != 0) {
333                 fprintf(stderr, "test_jobs failed\n");
334                 return 1;
335         }
336
337         ret = test_busydestroy();
338         if (ret != 0) {
339                 fprintf(stderr, "test_busydestroy failed\n");
340                 return 1;
341         }
342
343         /*
344          * Test 10 threads adding jobs on a single pool
345          */
346         ret = test_threaded_addjob(1, 10, 5, 5000);
347         if (ret != 0) {
348                 fprintf(stderr, "test_jobs failed\n");
349                 return 1;
350         }
351
352         /*
353          * Test 10 threads on 3 pools to verify our fork handling
354          * works right.
355          */
356         ret = test_threaded_addjob(3, 10, 5, 5000);
357         if (ret != 0) {
358                 fprintf(stderr, "test_jobs failed\n");
359                 return 1;
360         }
361
362         printf("success\n");
363         return 0;
364 }