tevent: add tevent_context_wrapper_create() infrastructure
[metze/samba/wip.git] / lib / tevent / tevent_threads.c
1 /*
2    tevent event library.
3
4    Copyright (C) Jeremy Allison 2015
5
6      ** NOTE! The following LGPL license applies to the tevent
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Lesser General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public
21    License along with this library; if not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/filesys.h"
26 #include "talloc.h"
27 #include "tevent.h"
28 #include "tevent_internal.h"
29 #include "tevent_util.h"
30
31 #ifdef HAVE_PTHREAD
32 #include "system/threads.h"
33
34 struct tevent_immediate_list {
35         struct tevent_immediate_list *next, *prev;
36         tevent_immediate_handler_t handler;
37         struct tevent_immediate *im;
38         void *private_ptr;
39 };
40
41 struct tevent_thread_proxy {
42         pthread_mutex_t mutex;
43         struct tevent_context *dest_ev_ctx;
44         int read_fd;
45         int write_fd;
46         struct tevent_fd *pipe_read_fde;
47         /* Pending events list. */
48         struct tevent_immediate_list *im_list;
49         /* Completed events list. */
50         struct tevent_immediate_list *tofree_im_list;
51         struct tevent_immediate *free_im;
52 };
53
54 static void free_im_list(struct tevent_immediate_list **pp_list_head)
55 {
56         struct tevent_immediate_list *im_entry = NULL;
57         struct tevent_immediate_list *im_next = NULL;
58
59         for (im_entry = *pp_list_head; im_entry; im_entry = im_next) {
60                 im_next = im_entry->next;
61                 DLIST_REMOVE(*pp_list_head, im_entry);
62                 TALLOC_FREE(im_entry);
63         }
64 }
65
66 static void free_list_handler(struct tevent_context *ev,
67                                 struct tevent_immediate *im,
68                                 void *private_ptr)
69 {
70         struct tevent_thread_proxy *tp =
71                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
72         int ret;
73
74         ret = pthread_mutex_lock(&tp->mutex);
75         if (ret != 0) {
76                 abort();
77                 /* Notreached. */
78                 return;
79         }
80
81         free_im_list(&tp->tofree_im_list);
82
83         ret = pthread_mutex_unlock(&tp->mutex);
84         if (ret != 0) {
85                 abort();
86                 /* Notreached. */
87                 return;
88         }
89 }
90
91 static void schedule_immediate_functions(struct tevent_thread_proxy *tp)
92 {
93         struct tevent_immediate_list *im_entry = NULL;
94         struct tevent_immediate_list *im_next = NULL;
95
96         for (im_entry = tp->im_list; im_entry; im_entry = im_next) {
97                 im_next = im_entry->next;
98                 DLIST_REMOVE(tp->im_list, im_entry);
99
100                 tevent_schedule_immediate(im_entry->im,
101                                         tp->dest_ev_ctx,
102                                         im_entry->handler,
103                                         im_entry->private_ptr);
104
105                 /* Move from pending list to free list. */
106                 DLIST_ADD(tp->tofree_im_list, im_entry);
107         }
108         if (tp->tofree_im_list != NULL) {
109                 /*
110                  * Once the current immediate events
111                  * are processed, we need to reschedule
112                  * ourselves to free them. This works
113                  * as tevent_schedule_immediate()
114                  * always adds events to the *END* of
115                  * the immediate events list.
116                  */
117                 tevent_schedule_immediate(tp->free_im,
118                                         tp->dest_ev_ctx,
119                                         free_list_handler,
120                                         tp);
121         }
122 }
123
124 static void pipe_read_handler(struct tevent_context *ev,
125                                 struct tevent_fd *fde,
126                                 uint16_t flags,
127                                 void *private_ptr)
128 {
129         struct tevent_thread_proxy *tp =
130                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
131         ssize_t len = 64;
132         int ret;
133
134         ret = pthread_mutex_lock(&tp->mutex);
135         if (ret != 0) {
136                 abort();
137                 /* Notreached. */
138                 return;
139         }
140
141         /*
142          * Clear out all data in the pipe. We
143          * don't really care if this returns -1.
144          */
145         while (len == 64) {
146                 char buf[64];
147                 len = read(tp->read_fd, buf, 64);
148         };
149
150         schedule_immediate_functions(tp);
151
152         ret = pthread_mutex_unlock(&tp->mutex);
153         if (ret != 0) {
154                 abort();
155                 /* Notreached. */
156                 return;
157         }
158 }
159
160 static int tevent_thread_proxy_destructor(struct tevent_thread_proxy *tp)
161 {
162         int ret;
163
164         ret = pthread_mutex_lock(&tp->mutex);
165         if (ret != 0) {
166                 abort();
167                 /* Notreached. */
168                 return 0;
169         }
170
171         TALLOC_FREE(tp->pipe_read_fde);
172
173         if (tp->read_fd != -1) {
174                 (void)close(tp->read_fd);
175                 tp->read_fd = -1;
176         }
177         if (tp->write_fd != -1) {
178                 (void)close(tp->write_fd);
179                 tp->write_fd = -1;
180         }
181
182         /* Hmmm. It's probably an error if we get here with
183            any non-NULL immediate entries.. */
184
185         free_im_list(&tp->im_list);
186         free_im_list(&tp->tofree_im_list);
187
188         TALLOC_FREE(tp->free_im);
189
190         ret = pthread_mutex_unlock(&tp->mutex);
191         if (ret != 0) {
192                 abort();
193                 /* Notreached. */
194                 return 0;
195         }
196
197         ret = pthread_mutex_destroy(&tp->mutex);
198         if (ret != 0) {
199                 abort();
200                 /* Notreached. */
201                 return 0;
202         }
203
204         return 0;
205 }
206
207 /*
208  * Create a struct that can be passed to other threads
209  * to allow them to signal the struct tevent_context *
210  * passed in.
211  */
212
213 struct tevent_thread_proxy *tevent_thread_proxy_create(
214                 struct tevent_context *dest_ev_ctx)
215 {
216         int ret;
217         int pipefds[2];
218         struct tevent_thread_proxy *tp;
219
220         if (dest_ev_ctx->wrapper.glue != NULL) {
221                 /*
222                  * stacking of wrappers is not supported
223                  */
224                 tevent_debug(dest_ev_ctx->wrapper.glue->main_ev,
225                              TEVENT_DEBUG_FATAL,
226                              "%s() not allowed on a wrapper context\n",
227                              __func__);
228                 errno = EINVAL;
229                 return NULL;
230         }
231
232         tp = talloc_zero(dest_ev_ctx, struct tevent_thread_proxy);
233         if (tp == NULL) {
234                 return NULL;
235         }
236
237         ret = pthread_mutex_init(&tp->mutex, NULL);
238         if (ret != 0) {
239                 goto fail;
240         }
241
242         tp->dest_ev_ctx = dest_ev_ctx;
243         tp->read_fd = -1;
244         tp->write_fd = -1;
245
246         talloc_set_destructor(tp, tevent_thread_proxy_destructor);
247
248         ret = pipe(pipefds);
249         if (ret == -1) {
250                 goto fail;
251         }
252
253         tp->read_fd = pipefds[0];
254         tp->write_fd = pipefds[1];
255
256         ret = ev_set_blocking(pipefds[0], false);
257         if (ret != 0) {
258                 goto fail;
259         }
260         ret = ev_set_blocking(pipefds[1], false);
261         if (ret != 0) {
262                 goto fail;
263         }
264         if (!ev_set_close_on_exec(pipefds[0])) {
265                 goto fail;
266         }
267         if (!ev_set_close_on_exec(pipefds[1])) {
268                 goto fail;
269         }
270
271         tp->pipe_read_fde = tevent_add_fd(dest_ev_ctx,
272                                 tp,
273                                 tp->read_fd,
274                                 TEVENT_FD_READ,
275                                 pipe_read_handler,
276                                 tp);
277         if (tp->pipe_read_fde == NULL) {
278                 goto fail;
279         }
280
281         /*
282          * Create an immediate event to free
283          * completed lists.
284          */
285         tp->free_im = tevent_create_immediate(tp);
286         if (tp->free_im == NULL) {
287                 goto fail;
288         }
289
290         return tp;
291
292   fail:
293
294         TALLOC_FREE(tp);
295         return NULL;
296 }
297
298 /*
299  * This function schedules an immediate event to be called with argument
300  * *pp_private in the thread context of dest_ev_ctx. Caller doesn't
301  * wait for activation to take place, this is simply fire-and-forget.
302  *
303  * pp_im must be a pointer to an immediate event talloced on
304  * a context owned by the calling thread, or the NULL context.
305  * Ownership of *pp_im will be transfered to the tevent library.
306  *
307  * pp_private can be null, or contents of *pp_private must be
308  * talloc'ed memory on a context owned by the calling thread
309  * or the NULL context. If non-null, ownership of *pp_private will
310  * be transfered to the tevent library.
311  *
312  * If you want to return a message, have the destination use the
313  * same function call to send back to the caller.
314  */
315
316
317 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
318                                   struct tevent_immediate **pp_im,
319                                   tevent_immediate_handler_t handler,
320                                   void *pp_private_data)
321 {
322         struct tevent_immediate_list *im_entry;
323         int ret;
324         char c;
325         ssize_t written;
326
327         ret = pthread_mutex_lock(&tp->mutex);
328         if (ret != 0) {
329                 abort();
330                 /* Notreached. */
331                 return;
332         }
333
334         if (tp->write_fd == -1) {
335                 /* In the process of being destroyed. Ignore. */
336                 goto end;
337         }
338
339         /* Create a new immediate_list entry. MUST BE ON THE NULL CONTEXT */
340         im_entry = talloc_zero(NULL, struct tevent_immediate_list);
341         if (im_entry == NULL) {
342                 goto end;
343         }
344
345         im_entry->handler = handler;
346         im_entry->im = talloc_move(im_entry, pp_im);
347
348         if (pp_private_data != NULL) {
349                 void **pptr = (void **)pp_private_data;
350                 im_entry->private_ptr = talloc_move(im_entry, pptr);
351         }
352
353         DLIST_ADD(tp->im_list, im_entry);
354
355         /* And notify the dest_ev_ctx to wake up. */
356         c = '\0';
357         do {
358                 written = write(tp->write_fd, &c, 1);
359         } while (written == -1 && errno == EINTR);
360
361   end:
362
363         ret = pthread_mutex_unlock(&tp->mutex);
364         if (ret != 0) {
365                 abort();
366                 /* Notreached. */
367         }
368 }
369 #else
370 /* !HAVE_PTHREAD */
371 struct tevent_thread_proxy *tevent_thread_proxy_create(
372                 struct tevent_context *dest_ev_ctx)
373 {
374         errno = ENOSYS;
375         return NULL;
376 }
377
378 void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
379                                   struct tevent_immediate **pp_im,
380                                   tevent_immediate_handler_t handler,
381                                   void *pp_private_data)
382 {
383         ;
384 }
385 #endif
386
387 static int tevent_threaded_context_destructor(
388         struct tevent_threaded_context *tctx)
389 {
390         struct tevent_context *main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
391         int ret;
392
393         if (main_ev != NULL) {
394                 DLIST_REMOVE(main_ev->threaded_contexts, tctx);
395         }
396
397         /*
398          * We have to coordinate with _tevent_threaded_schedule_immediate's
399          * unlock of the event_ctx_mutex. We're in the main thread here,
400          * and we can be scheduled before the helper thread finalizes its
401          * call _tevent_threaded_schedule_immediate. This means we would
402          * pthreadpool_destroy a locked mutex, which is illegal.
403          */
404         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
405         if (ret != 0) {
406                 abort();
407         }
408
409         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
410         if (ret != 0) {
411                 abort();
412         }
413
414         ret = pthread_mutex_destroy(&tctx->event_ctx_mutex);
415         if (ret != 0) {
416                 abort();
417         }
418
419         return 0;
420 }
421
422 struct tevent_threaded_context *tevent_threaded_context_create(
423         TALLOC_CTX *mem_ctx, struct tevent_context *ev)
424 {
425 #ifdef HAVE_PTHREAD
426         struct tevent_context *main_ev = tevent_wrapper_main_ev(ev);
427         struct tevent_threaded_context *tctx;
428         int ret;
429
430         ret = tevent_common_wakeup_init(main_ev);
431         if (ret != 0) {
432                 errno = ret;
433                 return NULL;
434         }
435
436         tctx = talloc(mem_ctx, struct tevent_threaded_context);
437         if (tctx == NULL) {
438                 return NULL;
439         }
440         tctx->event_ctx = ev;
441
442         ret = pthread_mutex_init(&tctx->event_ctx_mutex, NULL);
443         if (ret != 0) {
444                 TALLOC_FREE(tctx);
445                 return NULL;
446         }
447
448         DLIST_ADD(main_ev->threaded_contexts, tctx);
449         talloc_set_destructor(tctx, tevent_threaded_context_destructor);
450
451         return tctx;
452 #else
453         errno = ENOSYS;
454         return NULL;
455 #endif
456 }
457
458 static int tevent_threaded_schedule_immediate_destructor(struct tevent_immediate *im)
459 {
460         if (im->event_ctx != NULL) {
461                 abort();
462         }
463         return 0;
464 }
465
466 void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
467                                          struct tevent_immediate *im,
468                                          tevent_immediate_handler_t handler,
469                                          void *private_data,
470                                          const char *handler_name,
471                                          const char *location)
472 {
473 #ifdef HAVE_PTHREAD
474         const char *create_location = im->create_location;
475         struct tevent_context *main_ev = NULL;
476         int ret, wakeup_fd;
477
478         ret = pthread_mutex_lock(&tctx->event_ctx_mutex);
479         if (ret != 0) {
480                 abort();
481         }
482
483         if (tctx->event_ctx == NULL) {
484                 /*
485                  * Our event context is already gone.
486                  */
487                 ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
488                 if (ret != 0) {
489                         abort();
490                 }
491                 return;
492         }
493
494         if ((im->event_ctx != NULL) || (handler == NULL)) {
495                 abort();
496         }
497         if (im->destroyed) {
498                 abort();
499         }
500         if (im->busy) {
501                 abort();
502         }
503
504         main_ev = tevent_wrapper_main_ev(tctx->event_ctx);
505
506         *im = (struct tevent_immediate) {
507                 .event_ctx              = tctx->event_ctx,
508                 .handler                = handler,
509                 .private_data           = private_data,
510                 .handler_name           = handler_name,
511                 .create_location        = create_location,
512                 .schedule_location      = location,
513         };
514
515         talloc_set_destructor(im, tevent_threaded_schedule_immediate_destructor);
516
517         ret = pthread_mutex_lock(&main_ev->scheduled_mutex);
518         if (ret != 0) {
519                 abort();
520         }
521
522         DLIST_ADD_END(main_ev->scheduled_immediates, im);
523         wakeup_fd = main_ev->wakeup_fd;
524
525         ret = pthread_mutex_unlock(&main_ev->scheduled_mutex);
526         if (ret != 0) {
527                 abort();
528         }
529
530         ret = pthread_mutex_unlock(&tctx->event_ctx_mutex);
531         if (ret != 0) {
532                 abort();
533         }
534
535         /*
536          * We might want to wake up the main thread under the lock. We
537          * had a slightly similar situation in pthreadpool, changed
538          * with 1c4284c7395f23. This is not exactly the same, as the
539          * wakeup is only a last-resort thing in case the main thread
540          * is sleeping. Doing the wakeup under the lock can easily
541          * lead to a contended mutex, which is much more expensive
542          * than a noncontended one. So I'd opt for the lower footprint
543          * initially. Maybe we have to change that later.
544          */
545         tevent_common_wakeup_fd(wakeup_fd);
546 #else
547         /*
548          * tevent_threaded_context_create() returned NULL with ENOSYS...
549          */
550         abort();
551 #endif
552 }
553
554 void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
555 {
556 #ifdef HAVE_PTHREAD
557         int ret;
558         ret = pthread_mutex_lock(&ev->scheduled_mutex);
559         if (ret != 0) {
560                 abort();
561         }
562
563         while (ev->scheduled_immediates != NULL) {
564                 struct tevent_immediate *im = ev->scheduled_immediates;
565                 struct tevent_immediate copy = *im;
566
567                 DLIST_REMOVE(ev->scheduled_immediates, im);
568
569                 tevent_debug(ev, TEVENT_DEBUG_TRACE,
570                              "Schedule immediate event \"%s\": %p from thread into main\n",
571                              im->handler_name, im);
572                 im->handler_name = NULL;
573                 _tevent_schedule_immediate(im,
574                                            ev,
575                                            copy.handler,
576                                            copy.private_data,
577                                            copy.handler_name,
578                                            copy.schedule_location);
579         }
580
581         ret = pthread_mutex_unlock(&ev->scheduled_mutex);
582         if (ret != 0) {
583                 abort();
584         }
585 #else
586         /*
587          * tevent_threaded_context_create() returned NULL with ENOSYS...
588          */
589         abort();
590 #endif
591 }