pthreadpool: We always want asserts to abort()
[metze/samba/wip.git] / source3 / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "pthreadpool.h"
25 #include "lib/util/dlinklist.h"
26
27 #ifdef NDEBUG
28 #undef NDEBUG
29 #endif
30
31 #include <assert.h>
32
33 struct pthreadpool_job {
34         int id;
35         void (*fn)(void *private_data);
36         void *private_data;
37 };
38
39 struct pthreadpool {
40         /*
41          * List pthreadpools for fork safety
42          */
43         struct pthreadpool *prev, *next;
44
45         /*
46          * Control access to this struct
47          */
48         pthread_mutex_t mutex;
49
50         /*
51          * Threads waiting for work do so here
52          */
53         pthread_cond_t condvar;
54
55         /*
56          * Array of jobs
57          */
58         size_t jobs_array_len;
59         struct pthreadpool_job *jobs;
60
61         size_t head;
62         size_t num_jobs;
63
64         /*
65          * Indicate job completion
66          */
67         int (*signal_fn)(int jobid,
68                          void (*job_fn)(void *private_data),
69                          void *job_fn_private_data,
70                          void *private_data);
71         void *signal_fn_private_data;
72
73         /*
74          * indicator to worker threads that they should shut down
75          */
76         int shutdown;
77
78         /*
79          * maximum number of threads
80          */
81         int max_threads;
82
83         /*
84          * Number of threads
85          */
86         int num_threads;
87
88         /*
89          * Number of idle threads
90          */
91         int num_idle;
92
93         /*
94          * An array of threads that require joining.
95          */
96         int                     num_exited;
97         pthread_t               *exited; /* We alloc more */
98 };
99
100 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
101 static struct pthreadpool *pthreadpools = NULL;
102 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
103
104 static void pthreadpool_prep_atfork(void);
105
106 /*
107  * Initialize a thread pool
108  */
109
110 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
111                      int (*signal_fn)(int jobid,
112                                       void (*job_fn)(void *private_data),
113                                       void *job_fn_private_data,
114                                       void *private_data),
115                      void *signal_fn_private_data)
116 {
117         struct pthreadpool *pool;
118         int ret;
119
120         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
121         if (pool == NULL) {
122                 return ENOMEM;
123         }
124         pool->signal_fn = signal_fn;
125         pool->signal_fn_private_data = signal_fn_private_data;
126
127         pool->jobs_array_len = 4;
128         pool->jobs = calloc(
129                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
130
131         if (pool->jobs == NULL) {
132                 free(pool);
133                 return ENOMEM;
134         }
135
136         pool->head = pool->num_jobs = 0;
137
138         ret = pthread_mutex_init(&pool->mutex, NULL);
139         if (ret != 0) {
140                 free(pool->jobs);
141                 free(pool);
142                 return ret;
143         }
144
145         ret = pthread_cond_init(&pool->condvar, NULL);
146         if (ret != 0) {
147                 pthread_mutex_destroy(&pool->mutex);
148                 free(pool->jobs);
149                 free(pool);
150                 return ret;
151         }
152
153         pool->shutdown = 0;
154         pool->num_threads = 0;
155         pool->num_exited = 0;
156         pool->exited = NULL;
157         pool->max_threads = max_threads;
158         pool->num_idle = 0;
159
160         ret = pthread_mutex_lock(&pthreadpools_mutex);
161         if (ret != 0) {
162                 pthread_cond_destroy(&pool->condvar);
163                 pthread_mutex_destroy(&pool->mutex);
164                 free(pool->jobs);
165                 free(pool);
166                 return ret;
167         }
168         DLIST_ADD(pthreadpools, pool);
169
170         ret = pthread_mutex_unlock(&pthreadpools_mutex);
171         assert(ret == 0);
172
173         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
174
175         *presult = pool;
176
177         return 0;
178 }
179
180 static void pthreadpool_prepare(void)
181 {
182         int ret;
183         struct pthreadpool *pool;
184
185         ret = pthread_mutex_lock(&pthreadpools_mutex);
186         assert(ret == 0);
187
188         pool = pthreadpools;
189
190         while (pool != NULL) {
191                 ret = pthread_mutex_lock(&pool->mutex);
192                 assert(ret == 0);
193                 pool = pool->next;
194         }
195 }
196
197 static void pthreadpool_parent(void)
198 {
199         int ret;
200         struct pthreadpool *pool;
201
202         for (pool = DLIST_TAIL(pthreadpools);
203              pool != NULL;
204              pool = DLIST_PREV(pool)) {
205                 ret = pthread_mutex_unlock(&pool->mutex);
206                 assert(ret == 0);
207         }
208
209         ret = pthread_mutex_unlock(&pthreadpools_mutex);
210         assert(ret == 0);
211 }
212
213 static void pthreadpool_child(void)
214 {
215         int ret;
216         struct pthreadpool *pool;
217
218         for (pool = DLIST_TAIL(pthreadpools);
219              pool != NULL;
220              pool = DLIST_PREV(pool)) {
221
222                 pool->num_threads = 0;
223
224                 pool->num_exited = 0;
225                 free(pool->exited);
226                 pool->exited = NULL;
227
228                 pool->num_idle = 0;
229                 pool->head = 0;
230                 pool->num_jobs = 0;
231
232                 ret = pthread_mutex_unlock(&pool->mutex);
233                 assert(ret == 0);
234         }
235
236         ret = pthread_mutex_unlock(&pthreadpools_mutex);
237         assert(ret == 0);
238 }
239
240 static void pthreadpool_prep_atfork(void)
241 {
242         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
243                        pthreadpool_child);
244 }
245
246 /*
247  * Do a pthread_join() on all children that have exited, pool->mutex must be
248  * locked
249  */
250 static void pthreadpool_join_children(struct pthreadpool *pool)
251 {
252         int i;
253
254         for (i=0; i<pool->num_exited; i++) {
255                 int ret;
256
257                 ret = pthread_join(pool->exited[i], NULL);
258                 if (ret != 0) {
259                         /*
260                          * Severe internal error, we can't do much but
261                          * abort here.
262                          */
263                         abort();
264                 }
265         }
266         pool->num_exited = 0;
267
268         /*
269          * Deliberately not free and NULL pool->exited. That will be
270          * re-used by realloc later.
271          */
272 }
273
274 /*
275  * Destroy a thread pool, finishing all threads working for it
276  */
277
278 int pthreadpool_destroy(struct pthreadpool *pool)
279 {
280         int ret, ret1;
281
282         ret = pthread_mutex_lock(&pool->mutex);
283         if (ret != 0) {
284                 return ret;
285         }
286
287         if ((pool->num_jobs != 0) || pool->shutdown) {
288                 ret = pthread_mutex_unlock(&pool->mutex);
289                 assert(ret == 0);
290                 return EBUSY;
291         }
292
293         if (pool->num_threads > 0) {
294                 /*
295                  * We have active threads, tell them to finish, wait for that.
296                  */
297
298                 pool->shutdown = 1;
299
300                 if (pool->num_idle > 0) {
301                         /*
302                          * Wake the idle threads. They will find
303                          * pool->shutdown to be set and exit themselves
304                          */
305                         ret = pthread_cond_broadcast(&pool->condvar);
306                         if (ret != 0) {
307                                 pthread_mutex_unlock(&pool->mutex);
308                                 return ret;
309                         }
310                 }
311
312                 while ((pool->num_threads > 0) || (pool->num_exited > 0)) {
313
314                         if (pool->num_exited > 0) {
315                                 pthreadpool_join_children(pool);
316                                 continue;
317                         }
318                         /*
319                          * A thread that shuts down will also signal
320                          * pool->condvar
321                          */
322                         ret = pthread_cond_wait(&pool->condvar, &pool->mutex);
323                         if (ret != 0) {
324                                 pthread_mutex_unlock(&pool->mutex);
325                                 return ret;
326                         }
327                 }
328         }
329
330         ret = pthread_mutex_unlock(&pool->mutex);
331         if (ret != 0) {
332                 return ret;
333         }
334         ret = pthread_mutex_destroy(&pool->mutex);
335         ret1 = pthread_cond_destroy(&pool->condvar);
336
337         if (ret != 0) {
338                 return ret;
339         }
340         if (ret1 != 0) {
341                 return ret1;
342         }
343
344         ret = pthread_mutex_lock(&pthreadpools_mutex);
345         if (ret != 0) {
346                 return ret;
347         }
348         DLIST_REMOVE(pthreadpools, pool);
349         ret = pthread_mutex_unlock(&pthreadpools_mutex);
350         assert(ret == 0);
351
352         free(pool->exited);
353         free(pool->jobs);
354         free(pool);
355
356         return 0;
357 }
358
359 /*
360  * Prepare for pthread_exit(), pool->mutex must be locked
361  */
362 static void pthreadpool_server_exit(struct pthreadpool *pool)
363 {
364         pthread_t *exited;
365
366         pool->num_threads -= 1;
367
368         exited = (pthread_t *)realloc(
369                 pool->exited, sizeof(pthread_t) * (pool->num_exited + 1));
370
371         if (exited == NULL) {
372                 /* lost a thread status */
373                 return;
374         }
375         pool->exited = exited;
376
377         pool->exited[pool->num_exited] = pthread_self();
378         pool->num_exited += 1;
379 }
380
381 static bool pthreadpool_get_job(struct pthreadpool *p,
382                                 struct pthreadpool_job *job)
383 {
384         if (p->num_jobs == 0) {
385                 return false;
386         }
387         *job = p->jobs[p->head];
388         p->head = (p->head+1) % p->jobs_array_len;
389         p->num_jobs -= 1;
390         return true;
391 }
392
393 static bool pthreadpool_put_job(struct pthreadpool *p,
394                                 int id,
395                                 void (*fn)(void *private_data),
396                                 void *private_data)
397 {
398         struct pthreadpool_job *job;
399
400         if (p->num_jobs == p->jobs_array_len) {
401                 struct pthreadpool_job *tmp;
402                 size_t new_len = p->jobs_array_len * 2;
403
404                 tmp = realloc(
405                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
406                 if (tmp == NULL) {
407                         return false;
408                 }
409                 p->jobs = tmp;
410
411                 /*
412                  * We just doubled the jobs array. The array implements a FIFO
413                  * queue with a modulo-based wraparound, so we have to memcpy
414                  * the jobs that are logically at the queue end but physically
415                  * before the queue head into the reallocated area. The new
416                  * space starts at the current jobs_array_len, and we have to
417                  * copy everything before the current head job into the new
418                  * area.
419                  */
420                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
421                        sizeof(struct pthreadpool_job) * p->head);
422
423                 p->jobs_array_len = new_len;
424         }
425
426         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
427         job->id = id;
428         job->fn = fn;
429         job->private_data = private_data;
430
431         p->num_jobs += 1;
432
433         return true;
434 }
435
436 static void *pthreadpool_server(void *arg)
437 {
438         struct pthreadpool *pool = (struct pthreadpool *)arg;
439         int res;
440
441         res = pthread_mutex_lock(&pool->mutex);
442         if (res != 0) {
443                 return NULL;
444         }
445
446         while (1) {
447                 struct timespec ts;
448                 struct pthreadpool_job job;
449
450                 /*
451                  * idle-wait at most 1 second. If nothing happens in that
452                  * time, exit this thread.
453                  */
454
455                 clock_gettime(CLOCK_REALTIME, &ts);
456                 ts.tv_sec += 1;
457
458                 while ((pool->num_jobs == 0) && (pool->shutdown == 0)) {
459
460                         pool->num_idle += 1;
461                         res = pthread_cond_timedwait(
462                                 &pool->condvar, &pool->mutex, &ts);
463                         pool->num_idle -= 1;
464
465                         if (res == ETIMEDOUT) {
466
467                                 if (pool->num_jobs == 0) {
468                                         /*
469                                          * we timed out and still no work for
470                                          * us. Exit.
471                                          */
472                                         pthreadpool_server_exit(pool);
473                                         pthread_mutex_unlock(&pool->mutex);
474                                         return NULL;
475                                 }
476
477                                 break;
478                         }
479                         assert(res == 0);
480                 }
481
482                 if (pthreadpool_get_job(pool, &job)) {
483                         int ret;
484
485                         /*
486                          * Do the work with the mutex unlocked
487                          */
488
489                         res = pthread_mutex_unlock(&pool->mutex);
490                         assert(res == 0);
491
492                         job.fn(job.private_data);
493
494                         res = pthread_mutex_lock(&pool->mutex);
495                         assert(res == 0);
496
497                         ret = pool->signal_fn(job.id,
498                                               job.fn, job.private_data,
499                                               pool->signal_fn_private_data);
500                         if (ret != 0) {
501                                 pthreadpool_server_exit(pool);
502                                 pthread_mutex_unlock(&pool->mutex);
503                                 return NULL;
504                         }
505                 }
506
507                 if ((pool->num_jobs == 0) && (pool->shutdown != 0)) {
508                         /*
509                          * No more work to do and we're asked to shut down, so
510                          * exit
511                          */
512                         pthreadpool_server_exit(pool);
513
514                         if (pool->num_threads == 0) {
515                                 /*
516                                  * Ping the main thread waiting for all of us
517                                  * workers to have quit.
518                                  */
519                                 pthread_cond_broadcast(&pool->condvar);
520                         }
521
522                         pthread_mutex_unlock(&pool->mutex);
523                         return NULL;
524                 }
525         }
526 }
527
528 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
529                         void (*fn)(void *private_data), void *private_data)
530 {
531         pthread_t thread_id;
532         int res;
533         sigset_t mask, omask;
534
535         res = pthread_mutex_lock(&pool->mutex);
536         if (res != 0) {
537                 return res;
538         }
539
540         if (pool->shutdown) {
541                 /*
542                  * Protect against the pool being shut down while
543                  * trying to add a job
544                  */
545                 res = pthread_mutex_unlock(&pool->mutex);
546                 assert(res == 0);
547                 return EINVAL;
548         }
549
550         /*
551          * Just some cleanup under the mutex
552          */
553         pthreadpool_join_children(pool);
554
555         /*
556          * Add job to the end of the queue
557          */
558         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
559                 pthread_mutex_unlock(&pool->mutex);
560                 return ENOMEM;
561         }
562
563         if (pool->num_idle > 0) {
564                 /*
565                  * We have idle threads, wake one.
566                  */
567                 res = pthread_cond_signal(&pool->condvar);
568                 pthread_mutex_unlock(&pool->mutex);
569                 return res;
570         }
571
572         if ((pool->max_threads != 0) &&
573             (pool->num_threads >= pool->max_threads)) {
574                 /*
575                  * No more new threads, we just queue the request
576                  */
577                 pthread_mutex_unlock(&pool->mutex);
578                 return 0;
579         }
580
581         /*
582          * Create a new worker thread. It should not receive any signals.
583          */
584
585         sigfillset(&mask);
586
587         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
588         if (res != 0) {
589                 pthread_mutex_unlock(&pool->mutex);
590                 return res;
591         }
592
593         res = pthread_create(&thread_id, NULL, pthreadpool_server,
594                                 (void *)pool);
595         if (res == 0) {
596                 pool->num_threads += 1;
597         }
598
599         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
600
601         pthread_mutex_unlock(&pool->mutex);
602         return res;
603 }