Change the signature of pthreadpool_finished_job() to return 0
[mat/samba.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "config.h"
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include "system/time.h"
30 #include "system/filesys.h"
31
32 #include "pthreadpool.h"
33 #include "lib/util/dlinklist.h"
34
35 struct pthreadpool_job {
36         struct pthreadpool_job *next;
37         int id;
38         void (*fn)(void *private_data);
39         void *private_data;
40 };
41
42 struct pthreadpool {
43         /*
44          * List pthreadpools for fork safety
45          */
46         struct pthreadpool *prev, *next;
47
48         /*
49          * Control access to this struct
50          */
51         pthread_mutex_t mutex;
52
53         /*
54          * Threads waiting for work do so here
55          */
56         pthread_cond_t condvar;
57
58         /*
59          * List of work jobs
60          */
61         struct pthreadpool_job *jobs, *last_job;
62
63         /*
64          * pipe for signalling
65          */
66         int sig_pipe[2];
67
68         /*
69          * indicator to worker threads that they should shut down
70          */
71         int shutdown;
72
73         /*
74          * maximum number of threads
75          */
76         int max_threads;
77
78         /*
79          * Number of threads
80          */
81         int num_threads;
82
83         /*
84          * Number of idle threads
85          */
86         int num_idle;
87
88         /*
89          * An array of threads that require joining.
90          */
91         int                     num_exited;
92         pthread_t               *exited; /* We alloc more */
93 };
94
95 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
96 static struct pthreadpool *pthreadpools = NULL;
97 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
98
99 static void pthreadpool_prep_atfork(void);
100
101 /*
102  * Initialize a thread pool
103  */
104
105 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
106 {
107         struct pthreadpool *pool;
108         int ret;
109
110         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
111         if (pool == NULL) {
112                 return ENOMEM;
113         }
114
115         ret = pipe(pool->sig_pipe);
116         if (ret == -1) {
117                 int err = errno;
118                 free(pool);
119                 return err;
120         }
121
122         ret = pthread_mutex_init(&pool->mutex, NULL);
123         if (ret != 0) {
124                 close(pool->sig_pipe[0]);
125                 close(pool->sig_pipe[1]);
126                 free(pool);
127                 return ret;
128         }
129
130         ret = pthread_cond_init(&pool->condvar, NULL);
131         if (ret != 0) {
132                 pthread_mutex_destroy(&pool->mutex);
133                 close(pool->sig_pipe[0]);
134                 close(pool->sig_pipe[1]);
135                 free(pool);
136                 return ret;
137         }
138
139         pool->shutdown = 0;
140         pool->jobs = pool->last_job = NULL;
141         pool->num_threads = 0;
142         pool->num_exited = 0;
143         pool->exited = NULL;
144         pool->max_threads = max_threads;
145         pool->num_idle = 0;
146
147         ret = pthread_mutex_lock(&pthreadpools_mutex);
148         if (ret != 0) {
149                 pthread_cond_destroy(&pool->condvar);
150                 pthread_mutex_destroy(&pool->mutex);
151                 close(pool->sig_pipe[0]);
152                 close(pool->sig_pipe[1]);
153                 free(pool);
154                 return ret;
155         }
156         DLIST_ADD(pthreadpools, pool);
157
158         ret = pthread_mutex_unlock(&pthreadpools_mutex);
159         assert(ret == 0);
160
161         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
162
163         *presult = pool;
164
165         return 0;
166 }
167
168 static void pthreadpool_prepare(void)
169 {
170         int ret;
171         struct pthreadpool *pool;
172
173         ret = pthread_mutex_lock(&pthreadpools_mutex);
174         assert(ret == 0);
175
176         pool = pthreadpools;
177
178         while (pool != NULL) {
179                 ret = pthread_mutex_lock(&pool->mutex);
180                 assert(ret == 0);
181                 pool = pool->next;
182         }
183 }
184
185 static void pthreadpool_parent(void)
186 {
187         int ret;
188         struct pthreadpool *pool;
189
190         pool = DLIST_TAIL(pthreadpools);
191
192         while (1) {
193                 ret = pthread_mutex_unlock(&pool->mutex);
194                 assert(ret == 0);
195
196                 if (pool == pthreadpools) {
197                         break;
198                 }
199                 pool = pool->prev;
200         }
201
202         ret = pthread_mutex_unlock(&pthreadpools_mutex);
203         assert(ret == 0);
204 }
205
206 static void pthreadpool_child(void)
207 {
208         int ret;
209         struct pthreadpool *pool;
210
211         pool = DLIST_TAIL(pthreadpools);
212
213         while (1) {
214                 close(pool->sig_pipe[0]);
215                 close(pool->sig_pipe[1]);
216
217                 ret = pipe(pool->sig_pipe);
218                 assert(ret == 0);
219
220                 pool->num_threads = 0;
221
222                 pool->num_exited = 0;
223                 free(pool->exited);
224                 pool->exited = NULL;
225
226                 pool->num_idle = 0;
227
228                 while (pool->jobs != NULL) {
229                         struct pthreadpool_job *job;
230                         job = pool->jobs;
231                         pool->jobs = job->next;
232                         free(job);
233                 }
234                 pool->last_job = NULL;
235
236                 ret = pthread_mutex_unlock(&pool->mutex);
237                 assert(ret == 0);
238
239                 if (pool == pthreadpools) {
240                         break;
241                 }
242                 pool = pool->prev;
243         }
244
245         ret = pthread_mutex_unlock(&pthreadpools_mutex);
246         assert(ret == 0);
247 }
248
249 static void pthreadpool_prep_atfork(void)
250 {
251         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
252                        pthreadpool_child);
253 }
254
255 /*
256  * Return the file descriptor which becomes readable when a job has
257  * finished
258  */
259
260 int pthreadpool_signal_fd(struct pthreadpool *pool)
261 {
262         return pool->sig_pipe[0];
263 }
264
265 /*
266  * Do a pthread_join() on all children that have exited, pool->mutex must be
267  * locked
268  */
269 static void pthreadpool_join_children(struct pthreadpool *pool)
270 {
271         int i;
272
273         for (i=0; i<pool->num_exited; i++) {
274                 pthread_join(pool->exited[i], NULL);
275         }
276         pool->num_exited = 0;
277
278         /*
279          * Deliberately not free and NULL pool->exited. That will be
280          * re-used by realloc later.
281          */
282 }
283
284 /*
285  * Fetch a finished job number from the signal pipe
286  */
287
288 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
289 {
290         int ret_jobid;
291         ssize_t nread;
292
293         nread = -1;
294         errno = EINTR;
295
296         while ((nread == -1) && (errno == EINTR)) {
297                 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
298         }
299         if (nread == -1) {
300                 return errno;
301         }
302         if (nread != sizeof(int)) {
303                 return EINVAL;
304         }
305         *jobid = ret_jobid;
306         return 0;
307 }
308
309 /*
310  * Destroy a thread pool, finishing all threads working for it
311  */
312
313 int pthreadpool_destroy(struct pthreadpool *pool)
314 {
315         int ret, ret1;
316
317         ret = pthread_mutex_lock(&pool->mutex);
318         if (ret != 0) {
319                 return ret;
320         }
321
322         if ((pool->jobs != NULL) || pool->shutdown) {
323                 ret = pthread_mutex_unlock(&pool->mutex);
324                 assert(ret == 0);
325                 return EBUSY;
326         }
327
328         if (pool->num_threads > 0) {
329                 /*
330                  * We have active threads, tell them to finish, wait for that.
331                  */
332
333                 pool->shutdown = 1;
334
335                 if (pool->num_idle > 0) {
336                         /*
337                          * Wake the idle threads. They will find pool->quit to
338                          * be set and exit themselves
339                          */
340                         ret = pthread_cond_broadcast(&pool->condvar);
341                         if (ret != 0) {
342                                 pthread_mutex_unlock(&pool->mutex);
343                                 return ret;
344                         }
345                 }
346
347                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
348
349                         if (pool->num_exited > 0) {
350                                 pthreadpool_join_children(pool);
351                                 continue;
352                         }
353                         /*
354                          * A thread that shuts down will also signal
355                          * pool->condvar
356                          */
357                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
358                         if (ret != 0) {
359                                 pthread_mutex_unlock(&pool->mutex);
360                                 return ret;
361                         }
362                 }
363         }
364
365         ret = pthread_mutex_unlock(&pool->mutex);
366         if (ret != 0) {
367                 return ret;
368         }
369         ret = pthread_mutex_destroy(&pool->mutex);
370         ret1 = pthread_cond_destroy(&pool->condvar);
371
372         if (ret != 0) {
373                 return ret;
374         }
375         if (ret1 != 0) {
376                 return ret1;
377         }
378
379         ret = pthread_mutex_lock(&pthreadpools_mutex);
380         if (ret != 0) {
381                 return ret;
382         }
383         DLIST_REMOVE(pthreadpools, pool);
384         ret = pthread_mutex_unlock(&pthreadpools_mutex);
385         assert(ret == 0);
386
387         close(pool->sig_pipe[0]);
388         pool->sig_pipe[0] = -1;
389
390         close(pool->sig_pipe[1]);
391         pool->sig_pipe[1] = -1;
392
393         free(pool->exited);
394         free(pool);
395
396         return 0;
397 }
398
399 /*
400  * Prepare for pthread_exit(), pool->mutex must be locked
401  */
402 static void pthreadpool_server_exit(struct pthreadpool *pool)
403 {
404         pthread_t *exited;
405
406         pool->num_threads -= 1;
407
408         exited = (pthread_t *)realloc(
409                 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
410
411         if (exited == NULL) {
412                 /* lost a thread status */
413                 return;
414         }
415         pool->exited = exited;
416
417         pool->exited[pool->num_exited] = pthread_self();
418         pool->num_exited += 1;
419 }
420
421 static void *pthreadpool_server(void *arg)
422 {
423         struct pthreadpool *pool = (struct pthreadpool *)arg;
424         int res;
425
426         res = pthread_mutex_lock(&pool->mutex);
427         if (res != 0) {
428                 return NULL;
429         }
430
431         while (1) {
432                 struct timespec ts;
433                 struct pthreadpool_job *job;
434
435                 /*
436                  * idle-wait at most 1 second. If nothing happens in that
437                  * time, exit this thread.
438                  */
439
440                 clock_gettime(CLOCK_REALTIME, &ts);
441                 ts.tv_sec += 1;
442
443                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
444
445                         pool->num_idle += 1;
446                         res = pthread_cond_timedwait(
447                                 &pool->condvar, &pool->mutex, &ts);
448                         pool->num_idle -= 1;
449
450                         if (res == ETIMEDOUT) {
451
452                                 if (pool->jobs == NULL) {
453                                         /*
454                                          * we timed out and still no work for
455                                          * us. Exit.
456                                          */
457                                         pthreadpool_server_exit(pool);
458                                         pthread_mutex_unlock(&pool->mutex);
459                                         return NULL;
460                                 }
461
462                                 break;
463                         }
464                         assert(res == 0);
465                 }
466
467                 job = pool->jobs;
468
469                 if (job != NULL) {
470                         ssize_t written;
471
472                         /*
473                          * Ok, there's work for us to do, remove the job from
474                          * the pthreadpool list
475                          */
476                         pool->jobs = job->next;
477                         if (pool->last_job == job) {
478                                 pool->last_job = NULL;
479                         }
480
481                         /*
482                          * Do the work with the mutex unlocked
483                          */
484
485                         res = pthread_mutex_unlock(&pool->mutex);
486                         assert(res == 0);
487
488                         job->fn(job->private_data);
489
490                         written = write(pool->sig_pipe[1], &job->id,
491                                         sizeof(int));
492
493                         free(job);
494
495                         res = pthread_mutex_lock(&pool->mutex);
496                         assert(res == 0);
497
498                         if (written != sizeof(int)) {
499                                 pthreadpool_server_exit(pool);
500                                 pthread_mutex_unlock(&pool->mutex);
501                                 return NULL;
502                         }
503                 }
504
505                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
506                         /*
507                          * No more work to do and we're asked to shut down, so
508                          * exit
509                          */
510                         pthreadpool_server_exit(pool);
511
512                         if (pool->num_threads == 0) {
513                                 /*
514                                  * Ping the main thread waiting for all of us
515                                  * workers to have quit.
516                                  */
517                                 pthread_cond_broadcast(&pool->condvar);
518                         }
519
520                         pthread_mutex_unlock(&pool->mutex);
521                         return NULL;
522                 }
523         }
524 }
525
526 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
527                         void (*fn)(void *private_data), void *private_data)
528 {
529         struct pthreadpool_job *job;
530         pthread_t thread_id;
531         int res;
532         sigset_t mask, omask;
533
534         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
535         if (job == NULL) {
536                 return ENOMEM;
537         }
538
539         job->fn = fn;
540         job->private_data = private_data;
541         job->id = job_id;
542         job->next = NULL;
543
544         res = pthread_mutex_lock(&pool->mutex);
545         if (res != 0) {
546                 free(job);
547                 return res;
548         }
549
550         if (pool->shutdown) {
551                 /*
552                  * Protect against the pool being shut down while
553                  * trying to add a job
554                  */
555                 res = pthread_mutex_unlock(&pool->mutex);
556                 assert(res == 0);
557                 free(job);
558                 return EINVAL;
559         }
560
561         /*
562          * Just some cleanup under the mutex
563          */
564         pthreadpool_join_children(pool);
565
566         /*
567          * Add job to the end of the queue
568          */
569         if (pool->jobs == NULL) {
570                 pool->jobs = job;
571         }
572         else {
573                 pool->last_job->next = job;
574         }
575         pool->last_job = job;
576
577         if (pool->num_idle > 0) {
578                 /*
579                  * We have idle threads, wake one.
580                  */
581                 res = pthread_cond_signal(&pool->condvar);
582                 pthread_mutex_unlock(&pool->mutex);
583                 return res;
584         }
585
586         if ((pool->max_threads != 0) &&
587             (pool->num_threads >= pool->max_threads)) {
588                 /*
589                  * No more new threads, we just queue the request
590                  */
591                 pthread_mutex_unlock(&pool->mutex);
592                 return 0;
593         }
594
595         /*
596          * Create a new worker thread. It should not receive any signals.
597          */
598
599         sigfillset(&mask);
600
601         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
602         if (res != 0) {
603                 pthread_mutex_unlock(&pool->mutex);
604                 return res;
605         }
606
607         res = pthread_create(&thread_id, NULL, pthreadpool_server,
608                                 (void *)pool);
609         if (res == 0) {
610                 pool->num_threads += 1;
611         }
612
613         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
614
615         pthread_mutex_unlock(&pool->mutex);
616         return res;
617 }