04303776a2fec11572f4a99ae66d08de7b27f952
[obnox/samba/samba-obnox.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "config.h"
21 #include <errno.h>
22 #include <stdio.h>
23 #include <stdlib.h>
24 #include <string.h>
25 #include <pthread.h>
26 #include <signal.h>
27 #include <assert.h>
28 #include <fcntl.h>
29 #include "system/time.h"
30 #include "system/filesys.h"
31 #include "replace.h"
32
33 #include "pthreadpool.h"
34 #include "lib/util/dlinklist.h"
35
36 struct pthreadpool_job {
37         struct pthreadpool_job *next;
38         int id;
39         void (*fn)(void *private_data);
40         void *private_data;
41 };
42
43 struct pthreadpool {
44         /*
45          * List pthreadpools for fork safety
46          */
47         struct pthreadpool *prev, *next;
48
49         /*
50          * Control access to this struct
51          */
52         pthread_mutex_t mutex;
53
54         /*
55          * Threads waiting for work do so here
56          */
57         pthread_cond_t condvar;
58
59         /*
60          * List of work jobs
61          */
62         struct pthreadpool_job *jobs, *last_job;
63
64         /*
65          * pipe for signalling
66          */
67         int sig_pipe[2];
68
69         /*
70          * indicator to worker threads that they should shut down
71          */
72         int shutdown;
73
74         /*
75          * maximum number of threads
76          */
77         int max_threads;
78
79         /*
80          * Number of threads
81          */
82         int num_threads;
83
84         /*
85          * Number of idle threads
86          */
87         int num_idle;
88
89         /*
90          * An array of threads that require joining.
91          */
92         int                     num_exited;
93         pthread_t               *exited; /* We alloc more */
94 };
95
96 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
97 static struct pthreadpool *pthreadpools = NULL;
98 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
99
100 static void pthreadpool_prep_atfork(void);
101
102 /*
103  * Initialize a thread pool
104  */
105
106 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult)
107 {
108         struct pthreadpool *pool;
109         int ret;
110
111         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
112         if (pool == NULL) {
113                 return ENOMEM;
114         }
115
116         ret = pipe(pool->sig_pipe);
117         if (ret == -1) {
118                 int err = errno;
119                 free(pool);
120                 return err;
121         }
122
123         ret = pthread_mutex_init(&pool->mutex, NULL);
124         if (ret != 0) {
125                 close(pool->sig_pipe[0]);
126                 close(pool->sig_pipe[1]);
127                 free(pool);
128                 return ret;
129         }
130
131         ret = pthread_cond_init(&pool->condvar, NULL);
132         if (ret != 0) {
133                 pthread_mutex_destroy(&pool->mutex);
134                 close(pool->sig_pipe[0]);
135                 close(pool->sig_pipe[1]);
136                 free(pool);
137                 return ret;
138         }
139
140         pool->shutdown = 0;
141         pool->jobs = pool->last_job = NULL;
142         pool->num_threads = 0;
143         pool->num_exited = 0;
144         pool->exited = NULL;
145         pool->max_threads = max_threads;
146         pool->num_idle = 0;
147
148         ret = pthread_mutex_lock(&pthreadpools_mutex);
149         if (ret != 0) {
150                 pthread_cond_destroy(&pool->condvar);
151                 pthread_mutex_destroy(&pool->mutex);
152                 close(pool->sig_pipe[0]);
153                 close(pool->sig_pipe[1]);
154                 free(pool);
155                 return ret;
156         }
157         DLIST_ADD(pthreadpools, pool);
158
159         ret = pthread_mutex_unlock(&pthreadpools_mutex);
160         assert(ret == 0);
161
162         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
163
164         *presult = pool;
165
166         return 0;
167 }
168
169 static void pthreadpool_prepare(void)
170 {
171         int ret;
172         struct pthreadpool *pool;
173
174         ret = pthread_mutex_lock(&pthreadpools_mutex);
175         assert(ret == 0);
176
177         pool = pthreadpools;
178
179         while (pool != NULL) {
180                 ret = pthread_mutex_lock(&pool->mutex);
181                 assert(ret == 0);
182                 pool = pool->next;
183         }
184 }
185
186 static void pthreadpool_parent(void)
187 {
188         int ret;
189         struct pthreadpool *pool;
190
191         pool = DLIST_TAIL(pthreadpools);
192
193         while (1) {
194                 ret = pthread_mutex_unlock(&pool->mutex);
195                 assert(ret == 0);
196
197                 if (pool == pthreadpools) {
198                         break;
199                 }
200                 pool = pool->prev;
201         }
202
203         ret = pthread_mutex_unlock(&pthreadpools_mutex);
204         assert(ret == 0);
205 }
206
207 static void pthreadpool_child(void)
208 {
209         int ret;
210         struct pthreadpool *pool;
211
212         pool = DLIST_TAIL(pthreadpools);
213
214         while (1) {
215                 close(pool->sig_pipe[0]);
216                 close(pool->sig_pipe[1]);
217
218                 ret = pipe(pool->sig_pipe);
219                 assert(ret == 0);
220
221                 pool->num_threads = 0;
222
223                 pool->num_exited = 0;
224                 free(pool->exited);
225                 pool->exited = NULL;
226
227                 pool->num_idle = 0;
228
229                 while (pool->jobs != NULL) {
230                         struct pthreadpool_job *job;
231                         job = pool->jobs;
232                         pool->jobs = job->next;
233                         free(job);
234                 }
235                 pool->last_job = NULL;
236
237                 ret = pthread_mutex_unlock(&pool->mutex);
238                 assert(ret == 0);
239
240                 if (pool == pthreadpools) {
241                         break;
242                 }
243                 pool = pool->prev;
244         }
245
246         ret = pthread_mutex_unlock(&pthreadpools_mutex);
247         assert(ret == 0);
248 }
249
250 static void pthreadpool_prep_atfork(void)
251 {
252         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
253                        pthreadpool_child);
254 }
255
256 /*
257  * Return the file descriptor which becomes readable when a job has
258  * finished
259  */
260
261 int pthreadpool_signal_fd(struct pthreadpool *pool)
262 {
263         return pool->sig_pipe[0];
264 }
265
266 /*
267  * Do a pthread_join() on all children that have exited, pool->mutex must be
268  * locked
269  */
270 static void pthreadpool_join_children(struct pthreadpool *pool)
271 {
272         int i;
273
274         for (i=0; i<pool->num_exited; i++) {
275                 pthread_join(pool->exited[i], NULL);
276         }
277         pool->num_exited = 0;
278
279         /*
280          * Deliberately not free and NULL pool->exited. That will be
281          * re-used by realloc later.
282          */
283 }
284
285 /*
286  * Fetch a finished job number from the signal pipe
287  */
288
289 int pthreadpool_finished_job(struct pthreadpool *pool, int *jobid)
290 {
291         int ret_jobid;
292         ssize_t nread;
293
294         nread = -1;
295         errno = EINTR;
296
297         while ((nread == -1) && (errno == EINTR)) {
298                 nread = read(pool->sig_pipe[0], &ret_jobid, sizeof(int));
299         }
300         if (nread == -1) {
301                 return errno;
302         }
303         if (nread != sizeof(int)) {
304                 return EINVAL;
305         }
306         *jobid = ret_jobid;
307         return 0;
308 }
309
310 /*
311  * Destroy a thread pool, finishing all threads working for it
312  */
313
314 int pthreadpool_destroy(struct pthreadpool *pool)
315 {
316         int ret, ret1;
317
318         ret = pthread_mutex_lock(&pool->mutex);
319         if (ret != 0) {
320                 return ret;
321         }
322
323         if ((pool->jobs != NULL) || pool->shutdown) {
324                 ret = pthread_mutex_unlock(&pool->mutex);
325                 assert(ret == 0);
326                 return EBUSY;
327         }
328
329         if (pool->num_threads > 0) {
330                 /*
331                  * We have active threads, tell them to finish, wait for that.
332                  */
333
334                 pool->shutdown = 1;
335
336                 if (pool->num_idle > 0) {
337                         /*
338                          * Wake the idle threads. They will find pool->quit to
339                          * be set and exit themselves
340                          */
341                         ret = pthread_cond_broadcast(&pool->condvar);
342                         if (ret != 0) {
343                                 pthread_mutex_unlock(&pool->mutex);
344                                 return ret;
345                         }
346                 }
347
348                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
349
350                         if (pool->num_exited > 0) {
351                                 pthreadpool_join_children(pool);
352                                 continue;
353                         }
354                         /*
355                          * A thread that shuts down will also signal
356                          * pool->condvar
357                          */
358                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
359                         if (ret != 0) {
360                                 pthread_mutex_unlock(&pool->mutex);
361                                 return ret;
362                         }
363                 }
364         }
365
366         ret = pthread_mutex_unlock(&pool->mutex);
367         if (ret != 0) {
368                 return ret;
369         }
370         ret = pthread_mutex_destroy(&pool->mutex);
371         ret1 = pthread_cond_destroy(&pool->condvar);
372
373         if (ret != 0) {
374                 return ret;
375         }
376         if (ret1 != 0) {
377                 return ret1;
378         }
379
380         ret = pthread_mutex_lock(&pthreadpools_mutex);
381         if (ret != 0) {
382                 return ret;
383         }
384         DLIST_REMOVE(pthreadpools, pool);
385         ret = pthread_mutex_unlock(&pthreadpools_mutex);
386         assert(ret == 0);
387
388         close(pool->sig_pipe[0]);
389         pool->sig_pipe[0] = -1;
390
391         close(pool->sig_pipe[1]);
392         pool->sig_pipe[1] = -1;
393
394         free(pool->exited);
395         free(pool);
396
397         return 0;
398 }
399
400 /*
401  * Prepare for pthread_exit(), pool->mutex must be locked
402  */
403 static void pthreadpool_server_exit(struct pthreadpool *pool)
404 {
405         pthread_t *exited;
406
407         pool->num_threads -= 1;
408
409         exited = (pthread_t *)realloc(
410                 pool->exited, sizeof(pthread_t *) * (pool->num_exited + 1));
411
412         if (exited == NULL) {
413                 /* lost a thread status */
414                 return;
415         }
416         pool->exited = exited;
417
418         pool->exited[pool->num_exited] = pthread_self();
419         pool->num_exited += 1;
420 }
421
422 static void *pthreadpool_server(void *arg)
423 {
424         struct pthreadpool *pool = (struct pthreadpool *)arg;
425         int res;
426
427         res = pthread_mutex_lock(&pool->mutex);
428         if (res != 0) {
429                 return NULL;
430         }
431
432         while (1) {
433                 struct timespec ts;
434                 struct pthreadpool_job *job;
435
436                 /*
437                  * idle-wait at most 1 second. If nothing happens in that
438                  * time, exit this thread.
439                  */
440
441                 clock_gettime(CLOCK_REALTIME, &ts);
442                 ts.tv_sec += 1;
443
444                 while ((pool->jobs == NULL) && (pool->shutdown == 0)) {
445
446                         pool->num_idle += 1;
447                         res = pthread_cond_timedwait(
448                                 &pool->condvar, &pool->mutex, &ts);
449                         pool->num_idle -= 1;
450
451                         if (res == ETIMEDOUT) {
452
453                                 if (pool->jobs == NULL) {
454                                         /*
455                                          * we timed out and still no work for
456                                          * us. Exit.
457                                          */
458                                         pthreadpool_server_exit(pool);
459                                         pthread_mutex_unlock(&pool->mutex);
460                                         return NULL;
461                                 }
462
463                                 break;
464                         }
465                         assert(res == 0);
466                 }
467
468                 job = pool->jobs;
469
470                 if (job != NULL) {
471                         ssize_t written;
472
473                         /*
474                          * Ok, there's work for us to do, remove the job from
475                          * the pthreadpool list
476                          */
477                         pool->jobs = job->next;
478                         if (pool->last_job == job) {
479                                 pool->last_job = NULL;
480                         }
481
482                         /*
483                          * Do the work with the mutex unlocked
484                          */
485
486                         res = pthread_mutex_unlock(&pool->mutex);
487                         assert(res == 0);
488
489                         job->fn(job->private_data);
490
491                         written = write(pool->sig_pipe[1], &job->id,
492                                         sizeof(int));
493
494                         free(job);
495
496                         res = pthread_mutex_lock(&pool->mutex);
497                         assert(res == 0);
498
499                         if (written != sizeof(int)) {
500                                 pthreadpool_server_exit(pool);
501                                 pthread_mutex_unlock(&pool->mutex);
502                                 return NULL;
503                         }
504                 }
505
506                 if ((pool->jobs == NULL) && (pool->shutdown != 0)) {
507                         /*
508                          * No more work to do and we're asked to shut down, so
509                          * exit
510                          */
511                         pthreadpool_server_exit(pool);
512
513                         if (pool->num_threads == 0) {
514                                 /*
515                                  * Ping the main thread waiting for all of us
516                                  * workers to have quit.
517                                  */
518                                 pthread_cond_broadcast(&pool->condvar);
519                         }
520
521                         pthread_mutex_unlock(&pool->mutex);
522                         return NULL;
523                 }
524         }
525 }
526
527 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
528                         void (*fn)(void *private_data), void *private_data)
529 {
530         struct pthreadpool_job *job;
531         pthread_t thread_id;
532         int res;
533         sigset_t mask, omask;
534
535         job = (struct pthreadpool_job *)malloc(sizeof(struct pthreadpool_job));
536         if (job == NULL) {
537                 return ENOMEM;
538         }
539
540         job->fn = fn;
541         job->private_data = private_data;
542         job->id = job_id;
543         job->next = NULL;
544
545         res = pthread_mutex_lock(&pool->mutex);
546         if (res != 0) {
547                 free(job);
548                 return res;
549         }
550
551         if (pool->shutdown) {
552                 /*
553                  * Protect against the pool being shut down while
554                  * trying to add a job
555                  */
556                 res = pthread_mutex_unlock(&pool->mutex);
557                 assert(res == 0);
558                 free(job);
559                 return EINVAL;
560         }
561
562         /*
563          * Just some cleanup under the mutex
564          */
565         pthreadpool_join_children(pool);
566
567         /*
568          * Add job to the end of the queue
569          */
570         if (pool->jobs == NULL) {
571                 pool->jobs = job;
572         }
573         else {
574                 pool->last_job->next = job;
575         }
576         pool->last_job = job;
577
578         if (pool->num_idle > 0) {
579                 /*
580                  * We have idle threads, wake one.
581                  */
582                 res = pthread_cond_signal(&pool->condvar);
583                 pthread_mutex_unlock(&pool->mutex);
584                 return res;
585         }
586
587         if ((pool->max_threads != 0) &&
588             (pool->num_threads >= pool->max_threads)) {
589                 /*
590                  * No more new threads, we just queue the request
591                  */
592                 pthread_mutex_unlock(&pool->mutex);
593                 return 0;
594         }
595
596         /*
597          * Create a new worker thread. It should not receive any signals.
598          */
599
600         sigfillset(&mask);
601
602         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
603         if (res != 0) {
604                 pthread_mutex_unlock(&pool->mutex);
605                 return res;
606         }
607
608         res = pthread_create(&thread_id, NULL, pthreadpool_server,
609                                 (void *)pool);
610         if (res == 0) {
611                 pool->num_threads += 1;
612         }
613
614         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
615
616         pthread_mutex_unlock(&pool->mutex);
617         return res;
618 }