pthreadpool: Remove wrong comment.
[metze/samba/wip.git] / lib / pthreadpool / pthreadpool.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * thread pool implementation
4  * Copyright (C) Volker Lendecke 2009
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "system/time.h"
22 #include "system/wait.h"
23 #include "system/threads.h"
24 #include "system/filesys.h"
25 #include "pthreadpool.h"
26 #include "lib/util/dlinklist.h"
27
28 #ifdef NDEBUG
29 #undef NDEBUG
30 #endif
31
32 #include <assert.h>
33
34 struct pthreadpool_job {
35         int id;
36         void (*fn)(void *private_data);
37         void *private_data;
38 };
39
40 struct pthreadpool {
41         /*
42          * List pthreadpools for fork safety
43          */
44         struct pthreadpool *prev, *next;
45
46         /*
47          * Control access to this struct
48          */
49         pthread_mutex_t mutex;
50
51         /*
52          * Threads waiting for work do so here
53          */
54         pthread_cond_t condvar;
55
56         /*
57          * Array of jobs
58          */
59         size_t jobs_array_len;
60         struct pthreadpool_job *jobs;
61
62         size_t head;
63         size_t num_jobs;
64
65         /*
66          * Indicate job completion
67          */
68         int (*signal_fn)(int jobid,
69                          void (*job_fn)(void *private_data),
70                          void *job_fn_private_data,
71                          void *private_data);
72         void *signal_fn_private_data;
73
74         /*
75          * indicator to worker threads to stop processing further jobs
76          * and exit.
77          */
78         bool stopped;
79
80         /*
81          * indicator to the last worker thread to free the pool
82          * resources.
83          */
84         bool destroyed;
85
86         /*
87          * maximum number of threads
88          * 0 means no real thread, only strict sync processing.
89          */
90         unsigned max_threads;
91
92         /*
93          * Number of threads
94          */
95         unsigned num_threads;
96
97         /*
98          * Number of idle threads
99          */
100         unsigned num_idle;
101
102         /*
103          * Condition variable indicating that helper threads should
104          * quickly go away making way for fork() without anybody
105          * waiting on pool->condvar.
106          */
107         pthread_cond_t *prefork_cond;
108
109         /*
110          * Waiting position for helper threads while fork is
111          * running. The forking thread will have locked it, and all
112          * idle helper threads will sit here until after the fork,
113          * where the forking thread will unlock it again.
114          */
115         pthread_mutex_t fork_mutex;
116 };
117
118 static pthread_mutex_t pthreadpools_mutex = PTHREAD_MUTEX_INITIALIZER;
119 static struct pthreadpool *pthreadpools = NULL;
120 static pthread_once_t pthreadpool_atfork_initialized = PTHREAD_ONCE_INIT;
121
122 static void pthreadpool_prep_atfork(void);
123
124 /*
125  * Initialize a thread pool
126  */
127
128 int pthreadpool_init(unsigned max_threads, struct pthreadpool **presult,
129                      int (*signal_fn)(int jobid,
130                                       void (*job_fn)(void *private_data),
131                                       void *job_fn_private_data,
132                                       void *private_data),
133                      void *signal_fn_private_data)
134 {
135         struct pthreadpool *pool;
136         int ret;
137
138         pool = (struct pthreadpool *)malloc(sizeof(struct pthreadpool));
139         if (pool == NULL) {
140                 return ENOMEM;
141         }
142         pool->signal_fn = signal_fn;
143         pool->signal_fn_private_data = signal_fn_private_data;
144
145         pool->jobs_array_len = 4;
146         pool->jobs = calloc(
147                 pool->jobs_array_len, sizeof(struct pthreadpool_job));
148
149         if (pool->jobs == NULL) {
150                 free(pool);
151                 return ENOMEM;
152         }
153
154         pool->head = pool->num_jobs = 0;
155
156         ret = pthread_mutex_init(&pool->mutex, NULL);
157         if (ret != 0) {
158                 free(pool->jobs);
159                 free(pool);
160                 return ret;
161         }
162
163         ret = pthread_cond_init(&pool->condvar, NULL);
164         if (ret != 0) {
165                 pthread_mutex_destroy(&pool->mutex);
166                 free(pool->jobs);
167                 free(pool);
168                 return ret;
169         }
170
171         ret = pthread_mutex_init(&pool->fork_mutex, NULL);
172         if (ret != 0) {
173                 pthread_cond_destroy(&pool->condvar);
174                 pthread_mutex_destroy(&pool->mutex);
175                 free(pool->jobs);
176                 free(pool);
177                 return ret;
178         }
179
180         pool->stopped = false;
181         pool->destroyed = false;
182         pool->num_threads = 0;
183         pool->max_threads = max_threads;
184         pool->num_idle = 0;
185         pool->prefork_cond = NULL;
186
187         ret = pthread_mutex_lock(&pthreadpools_mutex);
188         if (ret != 0) {
189                 pthread_mutex_destroy(&pool->fork_mutex);
190                 pthread_cond_destroy(&pool->condvar);
191                 pthread_mutex_destroy(&pool->mutex);
192                 free(pool->jobs);
193                 free(pool);
194                 return ret;
195         }
196         DLIST_ADD(pthreadpools, pool);
197
198         ret = pthread_mutex_unlock(&pthreadpools_mutex);
199         assert(ret == 0);
200
201         pthread_once(&pthreadpool_atfork_initialized, pthreadpool_prep_atfork);
202
203         *presult = pool;
204
205         return 0;
206 }
207
208 size_t pthreadpool_max_threads(struct pthreadpool *pool)
209 {
210         if (pool->stopped) {
211                 return 0;
212         }
213
214         return pool->max_threads;
215 }
216
217 size_t pthreadpool_queued_jobs(struct pthreadpool *pool)
218 {
219         int res;
220         int unlock_res;
221         size_t ret;
222
223         if (pool->stopped) {
224                 return 0;
225         }
226
227         res = pthread_mutex_lock(&pool->mutex);
228         if (res != 0) {
229                 return res;
230         }
231
232         if (pool->stopped) {
233                 unlock_res = pthread_mutex_unlock(&pool->mutex);
234                 assert(unlock_res == 0);
235                 return 0;
236         }
237
238         ret = pool->num_jobs;
239
240         unlock_res = pthread_mutex_unlock(&pool->mutex);
241         assert(unlock_res == 0);
242         return ret;
243 }
244
245 static void pthreadpool_prepare_pool(struct pthreadpool *pool)
246 {
247         int ret;
248
249         ret = pthread_mutex_lock(&pool->fork_mutex);
250         assert(ret == 0);
251
252         ret = pthread_mutex_lock(&pool->mutex);
253         assert(ret == 0);
254
255         while (pool->num_idle != 0) {
256                 unsigned num_idle = pool->num_idle;
257                 pthread_cond_t prefork_cond;
258
259                 ret = pthread_cond_init(&prefork_cond, NULL);
260                 assert(ret == 0);
261
262                 /*
263                  * Push all idle threads off pool->condvar. In the
264                  * child we can destroy the pool, which would result
265                  * in undefined behaviour in the
266                  * pthread_cond_destroy(pool->condvar). glibc just
267                  * blocks here.
268                  */
269                 pool->prefork_cond = &prefork_cond;
270
271                 ret = pthread_cond_signal(&pool->condvar);
272                 assert(ret == 0);
273
274                 while (pool->num_idle == num_idle) {
275                         ret = pthread_cond_wait(&prefork_cond, &pool->mutex);
276                         assert(ret == 0);
277                 }
278
279                 pool->prefork_cond = NULL;
280
281                 ret = pthread_cond_destroy(&prefork_cond);
282                 assert(ret == 0);
283         }
284
285         /*
286          * Probably it's well-defined somewhere: What happens to
287          * condvars after a fork? The rationale of pthread_atfork only
288          * writes about mutexes. So better be safe than sorry and
289          * destroy/reinit pool->condvar across a fork.
290          */
291
292         ret = pthread_cond_destroy(&pool->condvar);
293         assert(ret == 0);
294 }
295
296 static void pthreadpool_prepare(void)
297 {
298         int ret;
299         struct pthreadpool *pool;
300
301         ret = pthread_mutex_lock(&pthreadpools_mutex);
302         assert(ret == 0);
303
304         pool = pthreadpools;
305
306         while (pool != NULL) {
307                 pthreadpool_prepare_pool(pool);
308                 pool = pool->next;
309         }
310 }
311
312 static void pthreadpool_parent(void)
313 {
314         int ret;
315         struct pthreadpool *pool;
316
317         for (pool = DLIST_TAIL(pthreadpools);
318              pool != NULL;
319              pool = DLIST_PREV(pool)) {
320                 ret = pthread_cond_init(&pool->condvar, NULL);
321                 assert(ret == 0);
322                 ret = pthread_mutex_unlock(&pool->mutex);
323                 assert(ret == 0);
324                 ret = pthread_mutex_unlock(&pool->fork_mutex);
325                 assert(ret == 0);
326         }
327
328         ret = pthread_mutex_unlock(&pthreadpools_mutex);
329         assert(ret == 0);
330 }
331
332 static void pthreadpool_child(void)
333 {
334         int ret;
335         struct pthreadpool *pool;
336
337         for (pool = DLIST_TAIL(pthreadpools);
338              pool != NULL;
339              pool = DLIST_PREV(pool)) {
340
341                 pool->num_threads = 0;
342                 pool->num_idle = 0;
343                 pool->head = 0;
344                 pool->num_jobs = 0;
345                 pool->stopped = true;
346
347                 ret = pthread_cond_init(&pool->condvar, NULL);
348                 assert(ret == 0);
349
350                 ret = pthread_mutex_unlock(&pool->mutex);
351                 assert(ret == 0);
352
353                 ret = pthread_mutex_unlock(&pool->fork_mutex);
354                 assert(ret == 0);
355         }
356
357         ret = pthread_mutex_unlock(&pthreadpools_mutex);
358         assert(ret == 0);
359 }
360
361 static void pthreadpool_prep_atfork(void)
362 {
363         pthread_atfork(pthreadpool_prepare, pthreadpool_parent,
364                        pthreadpool_child);
365 }
366
367 static int pthreadpool_free(struct pthreadpool *pool)
368 {
369         int ret, ret1, ret2;
370
371         ret = pthread_mutex_lock(&pthreadpools_mutex);
372         if (ret != 0) {
373                 return ret;
374         }
375         DLIST_REMOVE(pthreadpools, pool);
376         ret = pthread_mutex_unlock(&pthreadpools_mutex);
377         assert(ret == 0);
378
379         ret = pthread_mutex_lock(&pool->mutex);
380         assert(ret == 0);
381         ret = pthread_mutex_unlock(&pool->mutex);
382         assert(ret == 0);
383
384         ret = pthread_mutex_destroy(&pool->mutex);
385         ret1 = pthread_cond_destroy(&pool->condvar);
386         ret2 = pthread_mutex_destroy(&pool->fork_mutex);
387
388         if (ret != 0) {
389                 return ret;
390         }
391         if (ret1 != 0) {
392                 return ret1;
393         }
394         if (ret2 != 0) {
395                 return ret2;
396         }
397
398         free(pool->jobs);
399         free(pool);
400
401         return 0;
402 }
403
404 /*
405  * Stop a thread pool. Wake up all idle threads for exit.
406  */
407
408 static int pthreadpool_stop_locked(struct pthreadpool *pool)
409 {
410         int ret;
411
412         pool->stopped = true;
413
414         if (pool->num_threads == 0) {
415                 return 0;
416         }
417
418         /*
419          * We have active threads, tell them to finish.
420          */
421
422         ret = pthread_cond_broadcast(&pool->condvar);
423
424         return ret;
425 }
426
427 /*
428  * Stop a thread pool. Wake up all idle threads for exit.
429  */
430
431 int pthreadpool_stop(struct pthreadpool *pool)
432 {
433         int ret, ret1;
434
435         ret = pthread_mutex_lock(&pool->mutex);
436         if (ret != 0) {
437                 return ret;
438         }
439
440         if (!pool->stopped) {
441                 ret = pthreadpool_stop_locked(pool);
442         }
443
444         ret1 = pthread_mutex_unlock(&pool->mutex);
445         assert(ret1 == 0);
446
447         return ret;
448 }
449
450 /*
451  * Destroy a thread pool. Wake up all idle threads for exit. The last
452  * one will free the pool.
453  */
454
455 int pthreadpool_destroy(struct pthreadpool *pool)
456 {
457         int ret, ret1;
458         bool free_it;
459
460         assert(!pool->destroyed);
461
462         ret = pthread_mutex_lock(&pool->mutex);
463         if (ret != 0) {
464                 return ret;
465         }
466
467         pool->destroyed = true;
468
469         if (!pool->stopped) {
470                 ret = pthreadpool_stop_locked(pool);
471         }
472
473         free_it = (pool->num_threads == 0);
474
475         ret1 = pthread_mutex_unlock(&pool->mutex);
476         assert(ret1 == 0);
477
478         if (free_it) {
479                 pthreadpool_free(pool);
480         }
481
482         return ret;
483 }
484 /*
485  * Prepare for pthread_exit(), pool->mutex must be locked and will be
486  * unlocked here. This is a bit of a layering violation, but here we
487  * also take care of removing the pool if we're the last thread.
488  */
489 static void pthreadpool_server_exit(struct pthreadpool *pool)
490 {
491         int ret;
492         bool free_it;
493
494         pool->num_threads -= 1;
495
496         free_it = (pool->destroyed && (pool->num_threads == 0));
497
498         ret = pthread_mutex_unlock(&pool->mutex);
499         assert(ret == 0);
500
501         if (free_it) {
502                 pthreadpool_free(pool);
503         }
504 }
505
506 static bool pthreadpool_get_job(struct pthreadpool *p,
507                                 struct pthreadpool_job *job)
508 {
509         if (p->stopped) {
510                 return false;
511         }
512
513         if (p->num_jobs == 0) {
514                 return false;
515         }
516         *job = p->jobs[p->head];
517         p->head = (p->head+1) % p->jobs_array_len;
518         p->num_jobs -= 1;
519         return true;
520 }
521
522 static bool pthreadpool_put_job(struct pthreadpool *p,
523                                 int id,
524                                 void (*fn)(void *private_data),
525                                 void *private_data)
526 {
527         struct pthreadpool_job *job;
528
529         if (p->num_jobs == p->jobs_array_len) {
530                 struct pthreadpool_job *tmp;
531                 size_t new_len = p->jobs_array_len * 2;
532
533                 tmp = realloc(
534                         p->jobs, sizeof(struct pthreadpool_job) * new_len);
535                 if (tmp == NULL) {
536                         return false;
537                 }
538                 p->jobs = tmp;
539
540                 /*
541                  * We just doubled the jobs array. The array implements a FIFO
542                  * queue with a modulo-based wraparound, so we have to memcpy
543                  * the jobs that are logically at the queue end but physically
544                  * before the queue head into the reallocated area. The new
545                  * space starts at the current jobs_array_len, and we have to
546                  * copy everything before the current head job into the new
547                  * area.
548                  */
549                 memcpy(&p->jobs[p->jobs_array_len], p->jobs,
550                        sizeof(struct pthreadpool_job) * p->head);
551
552                 p->jobs_array_len = new_len;
553         }
554
555         job = &p->jobs[(p->head + p->num_jobs) % p->jobs_array_len];
556         job->id = id;
557         job->fn = fn;
558         job->private_data = private_data;
559
560         p->num_jobs += 1;
561
562         return true;
563 }
564
565 static void pthreadpool_undo_put_job(struct pthreadpool *p)
566 {
567         p->num_jobs -= 1;
568 }
569
570 static void *pthreadpool_server(void *arg)
571 {
572         struct pthreadpool *pool = (struct pthreadpool *)arg;
573         int res;
574
575         res = pthread_mutex_lock(&pool->mutex);
576         if (res != 0) {
577                 return NULL;
578         }
579
580         while (1) {
581                 struct timespec ts;
582                 struct pthreadpool_job job;
583
584                 /*
585                  * idle-wait at most 1 second. If nothing happens in that
586                  * time, exit this thread.
587                  */
588
589                 clock_gettime(CLOCK_REALTIME, &ts);
590                 ts.tv_sec += 1;
591
592                 while ((pool->num_jobs == 0) && !pool->stopped) {
593
594                         pool->num_idle += 1;
595                         res = pthread_cond_timedwait(
596                                 &pool->condvar, &pool->mutex, &ts);
597                         pool->num_idle -= 1;
598
599                         if (pool->prefork_cond != NULL) {
600                                 /*
601                                  * Me must allow fork() to continue
602                                  * without anybody waiting on
603                                  * &pool->condvar. Tell
604                                  * pthreadpool_prepare_pool that we
605                                  * got that message.
606                                  */
607
608                                 res = pthread_cond_signal(pool->prefork_cond);
609                                 assert(res == 0);
610
611                                 res = pthread_mutex_unlock(&pool->mutex);
612                                 assert(res == 0);
613
614                                 /*
615                                  * pthreadpool_prepare_pool has
616                                  * already locked this mutex across
617                                  * the fork. This makes us wait
618                                  * without sitting in a condvar.
619                                  */
620                                 res = pthread_mutex_lock(&pool->fork_mutex);
621                                 assert(res == 0);
622                                 res = pthread_mutex_unlock(&pool->fork_mutex);
623                                 assert(res == 0);
624
625                                 res = pthread_mutex_lock(&pool->mutex);
626                                 assert(res == 0);
627                         }
628
629                         if (res == ETIMEDOUT) {
630
631                                 if (pool->num_jobs == 0) {
632                                         /*
633                                          * we timed out and still no work for
634                                          * us. Exit.
635                                          */
636                                         pthreadpool_server_exit(pool);
637                                         return NULL;
638                                 }
639
640                                 break;
641                         }
642                         assert(res == 0);
643                 }
644
645                 if (pthreadpool_get_job(pool, &job)) {
646                         int ret;
647
648                         /*
649                          * Do the work with the mutex unlocked
650                          */
651
652                         res = pthread_mutex_unlock(&pool->mutex);
653                         assert(res == 0);
654
655                         job.fn(job.private_data);
656
657                         ret = pool->signal_fn(job.id,
658                                               job.fn, job.private_data,
659                                               pool->signal_fn_private_data);
660
661                         res = pthread_mutex_lock(&pool->mutex);
662                         assert(res == 0);
663
664                         if (ret != 0) {
665                                 pthreadpool_server_exit(pool);
666                                 return NULL;
667                         }
668                 }
669
670                 if (pool->stopped) {
671                         /*
672                          * we're asked to stop processing jobs, so exit
673                          */
674                         pthreadpool_server_exit(pool);
675                         return NULL;
676                 }
677         }
678 }
679
680 static int pthreadpool_create_thread(struct pthreadpool *pool)
681 {
682         pthread_attr_t thread_attr;
683         pthread_t thread_id;
684         int res;
685         sigset_t mask, omask;
686
687         /*
688          * Create a new worker thread. It should not receive any signals.
689          */
690
691         sigfillset(&mask);
692
693         res = pthread_attr_init(&thread_attr);
694         if (res != 0) {
695                 return res;
696         }
697
698         res = pthread_attr_setdetachstate(
699                 &thread_attr, PTHREAD_CREATE_DETACHED);
700         if (res != 0) {
701                 pthread_attr_destroy(&thread_attr);
702                 return res;
703         }
704
705         res = pthread_sigmask(SIG_BLOCK, &mask, &omask);
706         if (res != 0) {
707                 pthread_attr_destroy(&thread_attr);
708                 return res;
709         }
710
711         res = pthread_create(&thread_id, &thread_attr, pthreadpool_server,
712                              (void *)pool);
713
714         assert(pthread_sigmask(SIG_SETMASK, &omask, NULL) == 0);
715
716         pthread_attr_destroy(&thread_attr);
717
718         if (res == 0) {
719                 pool->num_threads += 1;
720         }
721
722         return res;
723 }
724
725 int pthreadpool_add_job(struct pthreadpool *pool, int job_id,
726                         void (*fn)(void *private_data), void *private_data)
727 {
728         int res;
729         int unlock_res;
730
731         assert(!pool->destroyed);
732
733         res = pthread_mutex_lock(&pool->mutex);
734         if (res != 0) {
735                 return res;
736         }
737
738         if (pool->stopped) {
739                 /*
740                  * Protect against the pool being shut down while
741                  * trying to add a job
742                  */
743                 unlock_res = pthread_mutex_unlock(&pool->mutex);
744                 assert(unlock_res == 0);
745                 return EINVAL;
746         }
747
748         if (pool->max_threads == 0) {
749                 unlock_res = pthread_mutex_unlock(&pool->mutex);
750                 assert(unlock_res == 0);
751
752                 /*
753                  * If no thread are allowed we do strict sync processing.
754                  */
755                 fn(private_data);
756                 res = pool->signal_fn(job_id, fn, private_data,
757                                       pool->signal_fn_private_data);
758                 return res;
759         }
760
761         /*
762          * Add job to the end of the queue
763          */
764         if (!pthreadpool_put_job(pool, job_id, fn, private_data)) {
765                 unlock_res = pthread_mutex_unlock(&pool->mutex);
766                 assert(unlock_res == 0);
767                 return ENOMEM;
768         }
769
770         if (pool->num_idle > 0) {
771                 /*
772                  * We have idle threads, wake one.
773                  */
774                 res = pthread_cond_signal(&pool->condvar);
775                 if (res != 0) {
776                         pthreadpool_undo_put_job(pool);
777                 }
778                 unlock_res = pthread_mutex_unlock(&pool->mutex);
779                 assert(unlock_res == 0);
780                 return res;
781         }
782
783         if (pool->num_threads >= pool->max_threads) {
784                 /*
785                  * No more new threads, we just queue the request
786                  */
787                 unlock_res = pthread_mutex_unlock(&pool->mutex);
788                 assert(unlock_res == 0);
789                 return 0;
790         }
791
792         res = pthreadpool_create_thread(pool);
793         if (res == 0) {
794                 unlock_res = pthread_mutex_unlock(&pool->mutex);
795                 assert(unlock_res == 0);
796                 return 0;
797         }
798
799         if (pool->num_threads != 0) {
800                 /*
801                  * At least one thread is still available, let
802                  * that one run the queued job.
803                  */
804                 unlock_res = pthread_mutex_unlock(&pool->mutex);
805                 assert(unlock_res == 0);
806                 return 0;
807         }
808
809         pthreadpool_undo_put_job(pool);
810
811         unlock_res = pthread_mutex_unlock(&pool->mutex);
812         assert(unlock_res == 0);
813
814         return res;
815 }
816
817 size_t pthreadpool_cancel_job(struct pthreadpool *pool, int job_id,
818                               void (*fn)(void *private_data), void *private_data)
819 {
820         int res;
821         size_t i, j;
822         size_t num = 0;
823
824         assert(!pool->destroyed);
825
826         res = pthread_mutex_lock(&pool->mutex);
827         if (res != 0) {
828                 return res;
829         }
830
831         for (i = 0, j = 0; i < pool->num_jobs; i++) {
832                 size_t idx = (pool->head + i) % pool->jobs_array_len;
833                 size_t new_idx = (pool->head + j) % pool->jobs_array_len;
834                 struct pthreadpool_job *job = &pool->jobs[idx];
835
836                 if ((job->private_data == private_data) &&
837                     (job->id == job_id) &&
838                     (job->fn == fn))
839                 {
840                         /*
841                          * Just skip the entry.
842                          */
843                         num++;
844                         continue;
845                 }
846
847                 /*
848                  * If we already removed one or more jobs (so j will be smaller
849                  * then i), we need to fill possible gaps in the logical list.
850                  */
851                 if (j < i) {
852                         pool->jobs[new_idx] = *job;
853                 }
854                 j++;
855         }
856
857         pool->num_jobs -= num;
858
859         res = pthread_mutex_unlock(&pool->mutex);
860         assert(res == 0);
861
862         return num;
863 }