e7e17d3bf0f7ce3804a653634039ec987c892a13
[samba.git] / lib / pthreadpool / pthreadpool_tevent.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * threadpool implementation based on pthreads
4  * Copyright (C) Volker Lendecke 2009,2011
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License as published by
8  * the Free Software Foundation; either version 3 of the License, or
9  * (at your option) any later version.
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  * GNU General Public License for more details.
15  *
16  * You should have received a copy of the GNU General Public License
17  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
18  */
19
20 #include "replace.h"
21 #include "pthreadpool_tevent.h"
22 #include "pthreadpool.h"
23 #include "lib/util/tevent_unix.h"
24 #include "lib/util/dlinklist.h"
25
26 struct pthreadpool_tevent_job_state;
27
28 /*
29  * We need one pthreadpool_tevent_glue object per unique combintaion of tevent
30  * contexts and pthreadpool_tevent objects. Maintain a list of used tevent
31  * contexts in a pthreadpool_tevent.
32  */
33 struct pthreadpool_tevent_glue {
34         struct pthreadpool_tevent_glue *prev, *next;
35         struct pthreadpool_tevent *pool; /* back-pointer to owning object. */
36         /* Tuple we are keeping track of in this list. */
37         struct tevent_context *ev;
38         struct tevent_threaded_context *tctx;
39         /* Pointer to link object owned by *ev. */
40         struct pthreadpool_tevent_glue_ev_link *ev_link;
41 };
42
43 /*
44  * The pthreadpool_tevent_glue_ev_link and its destructor ensure we remove the
45  * tevent context from our list of active event contexts if the event context
46  * is destroyed.
47  * This structure is talloc()'ed from the struct tevent_context *, and is a
48  * back-pointer allowing the related struct pthreadpool_tevent_glue object
49  * to be removed from the struct pthreadpool_tevent glue list if the owning
50  * tevent_context is talloc_free()'ed.
51  */
52 struct pthreadpool_tevent_glue_ev_link {
53         struct pthreadpool_tevent_glue *glue;
54 };
55
56 struct pthreadpool_tevent {
57         struct pthreadpool *pool;
58         struct pthreadpool_tevent_glue *glue_list;
59
60         struct pthreadpool_tevent_job *jobs;
61 };
62
63 struct pthreadpool_tevent_job_state {
64         struct tevent_context *ev;
65         struct tevent_req *req;
66         struct pthreadpool_tevent_job *job;
67 };
68
69 struct pthreadpool_tevent_job {
70         struct pthreadpool_tevent_job *prev, *next;
71
72         struct pthreadpool_tevent *pool;
73         struct pthreadpool_tevent_job_state *state;
74         struct tevent_immediate *im;
75
76         void (*fn)(void *private_data);
77         void *private_data;
78 };
79
80 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool);
81
82 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job);
83
84 static struct pthreadpool_tevent_job *orphaned_jobs;
85
86 void pthreadpool_tevent_cleanup_orphaned_jobs(void)
87 {
88         struct pthreadpool_tevent_job *job = NULL;
89         struct pthreadpool_tevent_job *njob = NULL;
90
91         for (job = orphaned_jobs; job != NULL; job = njob) {
92                 njob = job->next;
93
94                 /*
95                  * The job destructor keeps the job alive
96                  * (and in the list) or removes it from the list.
97                  */
98                 TALLOC_FREE(job);
99         }
100 }
101
102 static int pthreadpool_tevent_job_signal(int jobid,
103                                          void (*job_fn)(void *private_data),
104                                          void *job_private_data,
105                                          void *private_data);
106
107 int pthreadpool_tevent_init(TALLOC_CTX *mem_ctx, unsigned max_threads,
108                             struct pthreadpool_tevent **presult)
109 {
110         struct pthreadpool_tevent *pool;
111         int ret;
112
113         pthreadpool_tevent_cleanup_orphaned_jobs();
114
115         pool = talloc_zero(mem_ctx, struct pthreadpool_tevent);
116         if (pool == NULL) {
117                 return ENOMEM;
118         }
119
120         ret = pthreadpool_init(max_threads, &pool->pool,
121                                pthreadpool_tevent_job_signal, pool);
122         if (ret != 0) {
123                 TALLOC_FREE(pool);
124                 return ret;
125         }
126
127         talloc_set_destructor(pool, pthreadpool_tevent_destructor);
128
129         *presult = pool;
130         return 0;
131 }
132
133 size_t pthreadpool_tevent_max_threads(struct pthreadpool_tevent *pool)
134 {
135         if (pool->pool == NULL) {
136                 return 0;
137         }
138
139         return pthreadpool_max_threads(pool->pool);
140 }
141
142 size_t pthreadpool_tevent_queued_jobs(struct pthreadpool_tevent *pool)
143 {
144         if (pool->pool == NULL) {
145                 return 0;
146         }
147
148         return pthreadpool_queued_jobs(pool->pool);
149 }
150
151 static int pthreadpool_tevent_destructor(struct pthreadpool_tevent *pool)
152 {
153         struct pthreadpool_tevent_job *job = NULL;
154         struct pthreadpool_tevent_job *njob = NULL;
155         struct pthreadpool_tevent_glue *glue = NULL;
156         int ret;
157
158         ret = pthreadpool_stop(pool->pool);
159         if (ret != 0) {
160                 return ret;
161         }
162
163         for (job = pool->jobs; job != NULL; job = njob) {
164                 njob = job->next;
165
166                 /* The job this removes it from the list */
167                 pthreadpool_tevent_job_orphan(job);
168         }
169
170         /*
171          * Delete all the registered
172          * tevent_context/tevent_threaded_context
173          * pairs.
174          */
175         for (glue = pool->glue_list; glue != NULL; glue = pool->glue_list) {
176                 /* The glue destructor removes it from the list */
177                 TALLOC_FREE(glue);
178         }
179         pool->glue_list = NULL;
180
181         ret = pthreadpool_destroy(pool->pool);
182         if (ret != 0) {
183                 return ret;
184         }
185         pool->pool = NULL;
186
187         pthreadpool_tevent_cleanup_orphaned_jobs();
188
189         return 0;
190 }
191
192 static int pthreadpool_tevent_glue_destructor(
193         struct pthreadpool_tevent_glue *glue)
194 {
195         if (glue->pool->glue_list != NULL) {
196                 DLIST_REMOVE(glue->pool->glue_list, glue);
197         }
198
199         /* Ensure the ev_link destructor knows we're gone */
200         glue->ev_link->glue = NULL;
201
202         TALLOC_FREE(glue->ev_link);
203         TALLOC_FREE(glue->tctx);
204
205         return 0;
206 }
207
208 /*
209  * Destructor called either explicitly from
210  * pthreadpool_tevent_glue_destructor(), or indirectly
211  * when owning tevent_context is destroyed.
212  *
213  * When called from pthreadpool_tevent_glue_destructor()
214  * ev_link->glue is already NULL, so this does nothing.
215  *
216  * When called from talloc_free() of the owning
217  * tevent_context we must ensure we also remove the
218  * linked glue object from the list inside
219  * struct pthreadpool_tevent.
220  */
221 static int pthreadpool_tevent_glue_link_destructor(
222         struct pthreadpool_tevent_glue_ev_link *ev_link)
223 {
224         TALLOC_FREE(ev_link->glue);
225         return 0;
226 }
227
228 static int pthreadpool_tevent_register_ev(struct pthreadpool_tevent *pool,
229                                           struct tevent_context *ev)
230 {
231         struct pthreadpool_tevent_glue *glue = NULL;
232         struct pthreadpool_tevent_glue_ev_link *ev_link = NULL;
233
234         /*
235          * See if this tevent_context was already registered by
236          * searching the glue object list. If so we have nothing
237          * to do here - we already have a tevent_context/tevent_threaded_context
238          * pair.
239          */
240         for (glue = pool->glue_list; glue != NULL; glue = glue->next) {
241                 if (glue->ev == ev) {
242                         return 0;
243                 }
244         }
245
246         /*
247          * Event context not yet registered - create a new glue
248          * object containing a tevent_context/tevent_threaded_context
249          * pair and put it on the list to remember this registration.
250          * We also need a link object to ensure the event context
251          * can't go away without us knowing about it.
252          */
253         glue = talloc_zero(pool, struct pthreadpool_tevent_glue);
254         if (glue == NULL) {
255                 return ENOMEM;
256         }
257         *glue = (struct pthreadpool_tevent_glue) {
258                 .pool = pool,
259                 .ev = ev,
260         };
261         talloc_set_destructor(glue, pthreadpool_tevent_glue_destructor);
262
263         /*
264          * Now allocate the link object to the event context. Note this
265          * is allocated OFF THE EVENT CONTEXT ITSELF, so if the event
266          * context is freed we are able to cleanup the glue object
267          * in the link object destructor.
268          */
269
270         ev_link = talloc_zero(ev, struct pthreadpool_tevent_glue_ev_link);
271         if (ev_link == NULL) {
272                 TALLOC_FREE(glue);
273                 return ENOMEM;
274         }
275         ev_link->glue = glue;
276         talloc_set_destructor(ev_link, pthreadpool_tevent_glue_link_destructor);
277
278         glue->ev_link = ev_link;
279
280 #ifdef HAVE_PTHREAD
281         glue->tctx = tevent_threaded_context_create(glue, ev);
282         if (glue->tctx == NULL) {
283                 TALLOC_FREE(ev_link);
284                 TALLOC_FREE(glue);
285                 return ENOMEM;
286         }
287 #endif
288
289         DLIST_ADD(pool->glue_list, glue);
290         return 0;
291 }
292
293 static void pthreadpool_tevent_job_fn(void *private_data);
294 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
295                                         struct tevent_immediate *im,
296                                         void *private_data);
297 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req);
298
299 static int pthreadpool_tevent_job_destructor(struct pthreadpool_tevent_job *job)
300 {
301         /*
302          * We should never be called with state->state != NULL.
303          * Only pthreadpool_tevent_job_orphan() will call TALLOC_FREE(job)
304          * after detaching from the request state and pool list.
305          */
306         if (job->state != NULL) {
307                 abort();
308         }
309
310         /*
311          * If the job is not finished (job->im still there)
312          * and it's still attached to the pool,
313          * we try to cancel it (before it was starts)
314          */
315         if (job->im != NULL && job->pool != NULL) {
316                 size_t num;
317
318                 num = pthreadpool_cancel_job(job->pool->pool, 0,
319                                              pthreadpool_tevent_job_fn,
320                                              job);
321                 if (num != 0) {
322                         /*
323                          * It was not too late to cancel the request.
324                          *
325                          * We can remove job->im, as it will never be used.
326                          */
327                         TALLOC_FREE(job->im);
328                 }
329         }
330
331         /*
332          * pthreadpool_tevent_job_orphan() already removed
333          * it from pool->jobs. And we don't need try
334          * pthreadpool_cancel_job() again.
335          */
336         job->pool = NULL;
337
338         if (job->im != NULL) {
339                 /*
340                  * state->im still there means, we need to wait for the
341                  * immediate event to be triggered or just leak the memory.
342                  *
343                  * Move it to the orphaned list, if it's not already there.
344                  */
345                 return -1;
346         }
347
348         /*
349          * Finally remove from the orphaned_jobs list
350          * and let talloc destroy us.
351          */
352         DLIST_REMOVE(orphaned_jobs, job);
353
354         return 0;
355 }
356
357 static void pthreadpool_tevent_job_orphan(struct pthreadpool_tevent_job *job)
358 {
359         /*
360          * We're the only function that sets
361          * job->state = NULL;
362          */
363         if (job->state == NULL) {
364                 abort();
365         }
366
367         /*
368          * We need to reparent to a long term context.
369          * And detach from the request state.
370          * Maybe the destructor will keep the memory
371          * and leak it for now.
372          */
373         (void)talloc_reparent(job->state, NULL, job);
374         job->state->job = NULL;
375         job->state = NULL;
376
377         /*
378          * job->pool will only be set to NULL
379          * in the first destructur run.
380          */
381         if (job->pool == NULL) {
382                 abort();
383         }
384
385         /*
386          * Dettach it from the pool.
387          *
388          * The job might still be running,
389          * so we keep job->pool.
390          * The destructor will set it to NULL
391          * after trying pthreadpool_cancel_job()
392          */
393         DLIST_REMOVE(job->pool->jobs, job);
394
395         /*
396          * Add it to the list of orphaned jobs,
397          * which may be cleaned up later.
398          *
399          * The destructor removes it from the list
400          * when possible or it denies the free
401          * and keep it in the list.
402          */
403         DLIST_ADD_END(orphaned_jobs, job);
404         TALLOC_FREE(job);
405 }
406
407 static void pthreadpool_tevent_job_cleanup(struct tevent_req *req,
408                                            enum tevent_req_state req_state)
409 {
410         struct pthreadpool_tevent_job_state *state =
411                 tevent_req_data(req,
412                 struct pthreadpool_tevent_job_state);
413
414         if (state->job == NULL) {
415                 /*
416                  * The job request is not scheduled in the pool
417                  * yet or anymore.
418                  */
419                 return;
420         }
421
422         /*
423          * We need to reparent to a long term context.
424          * Maybe the destructor will keep the memory
425          * and leak it for now.
426          */
427         pthreadpool_tevent_job_orphan(state->job);
428         state->job = NULL; /* not needed but looks better */
429         return;
430 }
431
432 struct tevent_req *pthreadpool_tevent_job_send(
433         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
434         struct pthreadpool_tevent *pool,
435         void (*fn)(void *private_data), void *private_data)
436 {
437         struct tevent_req *req = NULL;
438         struct pthreadpool_tevent_job_state *state = NULL;
439         struct pthreadpool_tevent_job *job = NULL;
440         int ret;
441
442         pthreadpool_tevent_cleanup_orphaned_jobs();
443
444         req = tevent_req_create(mem_ctx, &state,
445                                 struct pthreadpool_tevent_job_state);
446         if (req == NULL) {
447                 return NULL;
448         }
449         state->ev = ev;
450         state->req = req;
451
452         tevent_req_set_cleanup_fn(req, pthreadpool_tevent_job_cleanup);
453
454         if (pool == NULL) {
455                 tevent_req_error(req, EINVAL);
456                 return tevent_req_post(req, ev);
457         }
458         if (pool->pool == NULL) {
459                 tevent_req_error(req, EINVAL);
460                 return tevent_req_post(req, ev);
461         }
462
463         ret = pthreadpool_tevent_register_ev(pool, ev);
464         if (tevent_req_error(req, ret)) {
465                 return tevent_req_post(req, ev);
466         }
467
468         job = talloc_zero(state, struct pthreadpool_tevent_job);
469         if (tevent_req_nomem(job, req)) {
470                 return tevent_req_post(req, ev);
471         }
472         job->pool = pool;
473         job->fn = fn;
474         job->private_data = private_data;
475         job->im = tevent_create_immediate(state->job);
476         if (tevent_req_nomem(job->im, req)) {
477                 return tevent_req_post(req, ev);
478         }
479         talloc_set_destructor(job, pthreadpool_tevent_job_destructor);
480         DLIST_ADD_END(job->pool->jobs, job);
481         job->state = state;
482         state->job = job;
483
484         ret = pthreadpool_add_job(job->pool->pool, 0,
485                                   pthreadpool_tevent_job_fn,
486                                   job);
487         if (tevent_req_error(req, ret)) {
488                 return tevent_req_post(req, ev);
489         }
490
491         tevent_req_set_cancel_fn(req, pthreadpool_tevent_job_cancel);
492         return req;
493 }
494
495 static void pthreadpool_tevent_job_fn(void *private_data)
496 {
497         struct pthreadpool_tevent_job *job =
498                 talloc_get_type_abort(private_data,
499                 struct pthreadpool_tevent_job);
500
501         job->fn(job->private_data);
502 }
503
504 static int pthreadpool_tevent_job_signal(int jobid,
505                                          void (*job_fn)(void *private_data),
506                                          void *job_private_data,
507                                          void *private_data)
508 {
509         struct pthreadpool_tevent_job *job =
510                 talloc_get_type_abort(job_private_data,
511                 struct pthreadpool_tevent_job);
512         struct pthreadpool_tevent_job_state *state = job->state;
513         struct tevent_threaded_context *tctx = NULL;
514         struct pthreadpool_tevent_glue *g = NULL;
515
516         if (state == NULL) {
517                 /* Request already gone */
518                 return 0;
519         }
520
521 #ifdef HAVE_PTHREAD
522         for (g = job->pool->glue_list; g != NULL; g = g->next) {
523                 if (g->ev == state->ev) {
524                         tctx = g->tctx;
525                         break;
526                 }
527         }
528
529         if (tctx == NULL) {
530                 abort();
531         }
532 #endif
533
534         if (tctx != NULL) {
535                 /* with HAVE_PTHREAD */
536                 tevent_threaded_schedule_immediate(tctx, job->im,
537                                                    pthreadpool_tevent_job_done,
538                                                    job);
539         } else {
540                 /* without HAVE_PTHREAD */
541                 tevent_schedule_immediate(job->im, state->ev,
542                                           pthreadpool_tevent_job_done,
543                                           job);
544         }
545
546         return 0;
547 }
548
549 static void pthreadpool_tevent_job_done(struct tevent_context *ctx,
550                                         struct tevent_immediate *im,
551                                         void *private_data)
552 {
553         struct pthreadpool_tevent_job *job =
554                 talloc_get_type_abort(private_data,
555                 struct pthreadpool_tevent_job);
556         struct pthreadpool_tevent_job_state *state = job->state;
557
558         TALLOC_FREE(job->im);
559
560         if (state == NULL) {
561                 /* Request already gone */
562                 TALLOC_FREE(job);
563                 return;
564         }
565
566         /*
567          * pthreadpool_tevent_job_cleanup()
568          * will destroy the job.
569          */
570         tevent_req_done(state->req);
571 }
572
573 static bool pthreadpool_tevent_job_cancel(struct tevent_req *req)
574 {
575         struct pthreadpool_tevent_job_state *state =
576                 tevent_req_data(req,
577                 struct pthreadpool_tevent_job_state);
578         struct pthreadpool_tevent_job *job = state->job;
579         size_t num;
580
581         if (job == NULL) {
582                 return false;
583         }
584
585         num = pthreadpool_cancel_job(job->pool->pool, 0,
586                                      pthreadpool_tevent_job_fn,
587                                      job);
588         if (num == 0) {
589                 /*
590                  * It was too late to cancel the request.
591                  */
592                 return false;
593         }
594
595         /*
596          * It was not too late to cancel the request.
597          *
598          * We can remove job->im, as it will never be used.
599          */
600         TALLOC_FREE(job->im);
601
602         /*
603          * pthreadpool_tevent_job_cleanup()
604          * will destroy the job.
605          */
606         tevent_req_defer_callback(req, state->ev);
607         tevent_req_error(req, ECANCELED);
608         return true;
609 }
610
611 int pthreadpool_tevent_job_recv(struct tevent_req *req)
612 {
613         return tevent_req_simple_recv_unix(req);
614 }