c916dc0f8d5ca1dc0f112c310a1f8d676386e5fd
[mat/samba.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "config.h"
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include "system/time.h"
30 #include "system/filesys.h"
31
32 #include "pthreadpool.h"
33 #include "lib/util/dlinklist.h"
34
35 struct pthreadpool_job {
36         struct pthreadpool_job *next;
37         int id;
38         void (*fn)(void *private_data);
39         void *private_data;
40 };
41
42 struct pthreadpool {
43         /*
44          * List pthreadpools for fork safety
45          */
46         struct pthreadpool *prev, *next;
47
48         /*
49          * Control access to this struct
50          */
51         pthread_mutex_t mutex;
52
53         /*
54          * Threads waiting for work do so here
55          */
56         pthread_cond_t condvar;
57
58         /*
59          * List of work jobs
60          */
61         struct pthreadpool_job *jobs, *last_job;
62
63         /*
64          * pipe for signalling
65          */
66         int sig_pipe[2];
67
68         /*
69          * indicator to worker threads that they should shut down
70          */
71         int shutdown;
72
73         /*
74          * maximum number of threads
75          */
76         int max_threads;
77
78         /*
79          * Number of threads
80          */
81         int num_threads;
82
83         /*
84          * Number of idle threads
85          */
86         int num_idle;
87
88         /*
89          * An array of threads that require joining.
90          */
91         int                     num_exited;
92         pthread_t               *exited; /* We alloc more */
93 };
94
95 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
96 static struct pthreadpool *pthreadpools = NULL;
97 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
98
99 static void pthreadpool_prep_atfork(void);
100
101 /*
102  * Initialize a thread pool
103  */
104
105 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
106 {
107         struct pthreadpool *pool;
108         int ret;
109
110         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
111         if (pool == NULL) {
112                 return ENOMEM;
113         }
114
115         ret = pipe(pool->sig_pipe);
116         if (ret == -1) {
117                 int err = errno;
118                 free(pool);
119                 return err;
120         }
121
122         ret = pthread_mutex_init(&pool->mutex, NULL);
123         if (ret != 0) {
124                 close(pool->sig_pipe[0]);
125                 close(pool->sig_pipe[1]);
126                 free(pool);
127                 return ret;
128         }
129
130         ret = pthread_cond_init(&pool->condvar, NULL);
131         if (ret != 0) {
132                 pthread_mutex_destroy(&pool->mutex);
133                 close(pool->sig_pipe[0]);
134                 close(pool->sig_pipe[1]);
135                 free(pool);
136                 return ret;
137         }
138
139         pool->shutdown = 0;
140         pool->jobs = pool->last_job = NULL;
141         pool->num_threads = 0;
142         pool->num_exited = 0;
143         pool->exited = NULL;
144         pool->max_threads = max_threads;
145         pool->num_idle = 0;
146
147         ret = pthread_mutex_lock(&pthreadpools_mutex);
148         if (ret != 0) {
149                 pthread_cond_destroy(&pool->condvar);
150                 pthread_mutex_destroy(&pool->mutex);
151                 close(pool->sig_pipe[0]);
152                 close(pool->sig_pipe[1]);
153                 free(pool);
154                 return ret;
155         }
156         DLIST_ADD(pthreadpools, pool);
157
158         ret = pthread_mutex_unlock(&pthreadpools_mutex);
159         assert(ret == 0);
160
161         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
162
163         *presult = pool;
164
165         return 0;
166 }
167
168 static void pthreadpool_prepare(void)
169 {
170         int ret;
171         struct pthreadpool *pool;
172
173         ret = pthread_mutex_lock(&pthreadpools_mutex);
174         assert(ret == 0);
175
176         pool = pthreadpools;
177
178         while (pool != NULL) {
179                 ret = pthread_mutex_lock(&pool->mutex);
180                 assert(ret == 0);
181                 pool = pool->next;
182         }
183 }
184
185 static void pthreadpool_parent(void)
186 {
187         int ret;
188         struct pthreadpool *pool;
189
190         pool = DLIST_TAIL(pthreadpools);
191
192         while (1) {
193                 ret = pthread_mutex_unlock(&pool->mutex);
194                 assert(ret == 0);
195
196                 if (pool == pthreadpools) {
197                         break;
198                 }
199                 pool = pool->prev;
200         }
201
202         ret = pthread_mutex_unlock(&pthreadpools_mutex);
203         assert(ret == 0);
204 }
205
206 static void pthreadpool_child(void)
207 {
208         int ret;
209         struct pthreadpool *pool;
210
211         pool = DLIST_TAIL(pthreadpools);
212
213         while (1) {
214                 close(pool->sig_pipe[0]);
215                 close(pool->sig_pipe[1]);
216
217                 ret = pipe(pool->sig_pipe);
218                 assert(ret == 0);
219
220                 pool->num_threads = 0;
221
222                 pool->num_exited = 0;
223                 free(pool->exited);
224                 pool->exited = NULL;
225
226                 pool->num_idle = 0;
227
228                 while (pool->jobs != NULL) {
229                         struct pthreadpool_job *job;
230                         job = pool->jobs;
231                         pool->jobs = job->next;
232                         free(job);
233                 }
234                 pool->last_job = NULL;
235
236                 ret = pthread_mutex_unlock(&pool->mutex);
237                 assert(ret == 0);
238
239                 if (pool == pthreadpools) {
240                         break;
241                 }
242                 pool = pool->prev;
243         }
244
245         ret = pthread_mutex_unlock(&pthreadpools_mutex);
246         assert(ret == 0);
247 }
248
249 static void pthreadpool_prep_atfork(void)
250 {
251         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
252                        pthreadpool_child);
253 }
254
255 /*
256  * Return the file descriptor which becomes readable when a job has
257  * finished
258  */
259
260 int pthreadpool_signal_fd(struct pthreadpool *pool)
261 {
262         return pool->sig_pipe[0];
263 }
264
265 /*
266  * Do a pthread_join() on all children that have exited, pool->mutex must be
267  * locked
268  */
269 static void pthreadpool_join_children(struct pthreadpool *pool)
270 {
271         int i;
272
273         for (i=0; i<pool->num_exited; i++) {
274                 pthread_join(pool->exited[i], NULL);
275         }
276         pool->num_exited = 0;
277
278         /*
279          * Deliberately not free and NULL pool->exited. That will be
280          * re-used by realloc later.
281          */
282 }
283
284 /*
285  * Fetch a finished job number from the signal pipe
286  */
287
288 int pthreadpool_finished_job(struct pthreadpool *pool)
289 {
290         int result;
291         ssize_t nread;
292
293         nread = -1;
294         errno = EINTR;
295
296         while ((nread == -1) && (errno == EINTR)) {
297                 nread = read(pool->sig_pipe[0], &result, sizeof(int));
298         }
299         if (nread == -1) {
300                 return errno;
301         }
302         if (nread != sizeof(int)) {
303                 return EINVAL;
304         }
305         return result;
306 }
307
308 /*
309  * Destroy a thread pool, finishing all threads working for it
310  */
311
312 int pthreadpool_destroy(struct pthreadpool *pool)
313 {
314         int ret, ret1;
315
316         ret = pthread_mutex_lock(&pool->mutex);
317         if (ret != 0) {
318                 return ret;
319         }
320
321         if ((pool->jobs != NULL) || pool->shutdown) {
322                 ret = pthread_mutex_unlock(&pool->mutex);
323                 assert(ret == 0);
324                 return EBUSY;
325         }
326
327         if (pool->num_threads > 0) {
328                 /*
329                  * We have active threads, tell them to finish, wait for that.
330                  */
331
332                 pool->shutdown = 1;
333
334                 if (pool->num_idle > 0) {
335                         /*
336                          * Wake the idle threads. They will find pool->quit to
337                          * be set and exit themselves
338                          */
339                         ret = pthread_cond_broadcast(&pool->condvar);
340                         if (ret != 0) {
341                                 pthread_mutex_unlock(&pool->mutex);
342                                 return ret;
343                         }
344                 }
345
346                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
347
348                         if (pool->num_exited > 0) {
349                                 pthreadpool_join_children(pool);
350                                 continue;
351                         }
352                         /*
353                          * A thread that shuts down will also signal
354                          * pool->condvar
355                          */
356                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
357                         if (ret != 0) {
358                                 pthread_mutex_unlock(&pool->mutex);
359                                 return ret;
360                         }
361                 }
362         }
363
364         ret = pthread_mutex_unlock(&pool->mutex);
365         if (ret != 0) {
366                 return ret;
367         }
368         ret = pthread_mutex_destroy(&pool->mutex);
369         ret1 = pthread_cond_destroy(&pool->condvar);
370
371         if (ret != 0) {
372                 return ret;
373         }
374         if (ret1 != 0) {
375                 return ret1;
376         }
377
378         ret = pthread_mutex_lock(&pthreadpools_mutex);
379         if (ret != 0) {
380                 return ret;
381         }
382         DLIST_REMOVE(pthreadpools, pool);
383         ret = pthread_mutex_unlock(&pthreadpools_mutex);
384         assert(ret == 0);
385
386         close(pool->sig_pipe[0]);
387         pool->sig_pipe[0] = -1;
388
389         close(pool->sig_pipe[1]);
390         pool->sig_pipe[1] = -1;
391
392         free(pool->exited);
393         free(pool);
394
395         return 0;
396 }
397
398 /*
399  * Prepare for pthread_exit(), pool->mutex must be locked
400  */
401 static void pthreadpool_server_exit(struct pthreadpool *pool)
402 {
403         pthread_t *exited;
404
405         pool->num_threads -= 1;
406
407         exited = (pthread_t *)realloc(
408                 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
409
410         if (exited == NULL) {
411                 /* lost a thread status */
412                 return;
413         }
414         pool->exited = exited;
415
416         pool->exited[pool->num_exited] = pthread_self();
417         pool->num_exited += 1;
418 }
419
420 static void *pthreadpool_server(void *arg)
421 {
422         struct pthreadpool *pool = (struct pthreadpool *)arg;
423         int res;
424
425         res = pthread_mutex_lock(&pool->mutex);
426         if (res != 0) {
427                 return NULL;
428         }
429
430         while (1) {
431                 struct timespec ts;
432                 struct pthreadpool_job *job;
433
434                 /*
435                  * idle-wait at most 1 second. If nothing happens in that
436                  * time, exit this thread.
437                  */
438
439                 clock_gettime(CLOCK_REALTIME, &ts);
440                 ts.tv_sec += 1;
441
442                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
443
444                         pool->num_idle += 1;
445                         res = pthread_cond_timedwait(
446                                 &pool->condvar, &pool->mutex, &ts);
447                         pool->num_idle -= 1;
448
449                         if (res == ETIMEDOUT) {
450
451                                 if (pool->jobs == NULL) {
452                                         /*
453                                          * we timed out and still no work for
454                                          * us. Exit.
455                                          */
456                                         pthreadpool_server_exit(pool);
457                                         pthread_mutex_unlock(&pool->mutex);
458                                         return NULL;
459                                 }
460
461                                 break;
462                         }
463                         assert(res == 0);
464                 }
465
466                 job = pool->jobs;
467
468                 if (job != NULL) {
469                         ssize_t written;
470
471                         /*
472                          * Ok, there's work for us to do, remove the job from
473                          * the pthreadpool list
474                          */
475                         pool->jobs = job->next;
476                         if (pool->last_job == job) {
477                                 pool->last_job = NULL;
478                         }
479
480                         /*
481                          * Do the work with the mutex unlocked
482                          */
483
484                         res = pthread_mutex_unlock(&pool->mutex);
485                         assert(res == 0);
486
487                         job->fn(job->private_data);
488
489                         written = write(pool->sig_pipe[1], &job->id,
490                                         sizeof(int));
491
492                         free(job);
493
494                         res = pthread_mutex_lock(&pool->mutex);
495                         assert(res == 0);
496
497                         if (written != sizeof(int)) {
498                                 pthreadpool_server_exit(pool);
499                                 pthread_mutex_unlock(&pool->mutex);
500                                 return NULL;
501                         }
502                 }
503
504                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
505                         /*
506                          * No more work to do and we're asked to shut down, so
507                          * exit
508                          */
509                         pthreadpool_server_exit(pool);
510
511                         if (pool->num_threads == 0) {
512                                 /*
513                                  * Ping the main thread waiting for all of us
514                                  * workers to have quit.
515                                  */
516                                 pthread_cond_broadcast(&pool->condvar);
517                         }
518
519                         pthread_mutex_unlock(&pool->mutex);
520                         return NULL;
521                 }
522         }
523 }
524
525 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
526                         void (*fn)(void *private_data), void *private_data)
527 {
528         struct pthreadpool_job *job;
529         pthread_t thread_id;
530         int res;
531         sigset_t mask, omask;
532
533         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
534         if (job == NULL) {
535                 return ENOMEM;
536         }
537
538         job->fn = fn;
539         job->private_data = private_data;
540         job->id = job_id;
541         job->next = NULL;
542
543         res = pthread_mutex_lock(&pool->mutex);
544         if (res != 0) {
545                 free(job);
546                 return res;
547         }
548
549         if (pool->shutdown) {
550                 /*
551                  * Protect against the pool being shut down while
552                  * trying to add a job
553                  */
554                 res = pthread_mutex_unlock(&pool->mutex);
555                 assert(res == 0);
556                 free(job);
557                 return EINVAL;
558         }
559
560         /*
561          * Just some cleanup under the mutex
562          */
563         pthreadpool_join_children(pool);
564
565         /*
566          * Add job to the end of the queue
567          */
568         if (pool->jobs == NULL) {
569                 pool->jobs = job;
570         }
571         else {
572                 pool->last_job->next = job;
573         }
574         pool->last_job = job;
575
576         if (pool->num_idle > 0) {
577                 /*
578                  * We have idle threads, wake one.
579                  */
580                 res = pthread_cond_signal(&pool->condvar);
581                 pthread_mutex_unlock(&pool->mutex);
582                 return res;
583         }
584
585         if ((pool->max_threads != 0) &&
586             (pool->num_threads >= pool->max_threads)) {
587                 /*
588                  * No more new threads, we just queue the request
589                  */
590                 pthread_mutex_unlock(&pool->mutex);
591                 return 0;
592         }
593
594         /*
595          * Create a new worker thread. It should not receive any signals.
596          */
597
598         sigfillset(&mask);
599
600         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
601         if (res != 0) {
602                 pthread_mutex_unlock(&pool->mutex);
603                 return res;
604         }
605
606         res = pthread_create(&thread_id, NULL, pthreadpool_server,
607                                 (void *)pool);
608         if (res == 0) {
609                 pool->num_threads += 1;
610         }
611
612         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
613
614         pthread_mutex_unlock(&pool->mutex);
615         return res;
616 }