tevent: Add threaded immediate activation
authorVolker Lendecke <vl@samba.org>
Mon, 8 Aug 2016 09:26:37 +0000 (11:26 +0200)
committerJeremy Allison <jra@samba.org>
Tue, 23 Aug 2016 23:33:48 +0000 (01:33 +0200)
This is infrastructure to improve our async r/w result handling and latency.
The pthreadpool signalling goes through a pipe. This has downsides: The main
event loop has to go through a read on the pipe before it can ship the result.
Also, it is not guaranteed by poll/epoll that the pthreadpool signal pipe is
handled with top priority. When an async pread/pwrite has finished, we should
immediately ship the result to the client, not waiting for anything else.

This patch enables tevent_immediate structs as job signalling. This means a
busy main tevent loop will handle the threaded job completion before any timed
or file descriptor events. Opposite to Jeremy's tevent_thread_proxy this is
done by a modification of the main event loop by looking at a linked list under
a central mutex.

Regarding performance: In a later commit I've created a test that does nothing
but fire one immediate over and over again. If you add a phread_mutex_lock and
unlock pair in the immediate handler, you lose roughly 25% of rounds per
second, so it is measurable. It is questionable that will be measurable in the
real world, but to counter concerns activation of immediates needs to go
through a new struct tevent_threaded_context. Only if such a
tevent_threaded_context exists for a tevent context, the main loop takes the
hit to look at the mutex'ed list of finished jobs.

This patch by design does not care about talloc hierarchies. The idea is that
the main thread owning the tevent context creates a chunk of memory and
prepares the tevent_immediate indication job completion. The main thread hands
the memory chunk together with the immediate as a job description over to a
helper thread. The helper thread does its job and upon completion calls
tevent_threaded_schedule_immediate with the already-prepared immediate. From
that point on memory ownership is again transferred to the main thread.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
lib/tevent/ABI/tevent-0.9.29.sigs
lib/tevent/tevent.c
lib/tevent/tevent.h
lib/tevent/tevent_epoll.c
lib/tevent/tevent_internal.h
lib/tevent/tevent_poll.c
lib/tevent/tevent_port.c
lib/tevent/tevent_select.c
lib/tevent/tevent_threads.c
lib/tevent/wscript

index 4b647419d081f52d5ec5f7d4a20cee944e1d9d0f..9b8bfa1cedd88620956e7415b4d127edc8d58395 100644 (file)
@@ -16,6 +16,7 @@ _tevent_req_nomem: bool (const void *, struct tevent_req *, const char *)
 _tevent_req_notify_callback: void (struct tevent_req *, const char *)
 _tevent_req_oom: void (struct tevent_req *, const char *)
 _tevent_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+_tevent_threaded_schedule_immediate: void (struct tevent_threaded_context *, struct tevent_immediate *, tevent_immediate_handler_t, void *, const char *, const char *)
 tevent_backend_list: const char **(TALLOC_CTX *)
 tevent_cleanup_pending_signal_handlers: void (struct tevent_signal *)
 tevent_common_add_fd: struct tevent_fd *(struct tevent_context *, TALLOC_CTX *, int, uint16_t, tevent_fd_handler_t, void *, const char *, const char *)
@@ -33,6 +34,7 @@ tevent_common_loop_immediate: bool (struct tevent_context *)
 tevent_common_loop_timer_delay: struct timeval (struct tevent_context *)
 tevent_common_loop_wait: int (struct tevent_context *, const char *)
 tevent_common_schedule_immediate: void (struct tevent_immediate *, struct tevent_context *, tevent_immediate_handler_t, void *, const char *, const char *)
+tevent_common_threaded_activate_immediate: void (struct tevent_context *)
 tevent_common_wakeup: int (struct tevent_context *)
 tevent_common_wakeup_init: int (struct tevent_context *)
 tevent_context_init: struct tevent_context *(TALLOC_CTX *)
@@ -80,6 +82,7 @@ tevent_set_trace_callback: void (struct tevent_context *, tevent_trace_callback_
 tevent_signal_support: bool (struct tevent_context *)
 tevent_thread_proxy_create: struct tevent_thread_proxy *(struct tevent_context *)
 tevent_thread_proxy_schedule: void (struct tevent_thread_proxy *, struct tevent_immediate **, tevent_immediate_handler_t, void *)
+tevent_threaded_context_create: struct tevent_threaded_context *(TALLOC_CTX *, struct tevent_context *)
 tevent_timeval_add: struct timeval (const struct timeval *, uint32_t, uint32_t)
 tevent_timeval_compare: int (const struct timeval *, const struct timeval *)
 tevent_timeval_current: struct timeval (void)
index 34cd402d5e46e69bf64e555f9a741b314cd29173..b8178b2d855388fd5dcbb77ce26f231b9480f0e6 100644 (file)
 */
 #include "replace.h"
 #include "system/filesys.h"
+#ifdef HAVE_PTHREAD
+#include "system/threads.h"
+#endif
 #define TEVENT_DEPRECATED 1
 #include "tevent.h"
 #include "tevent_internal.h"
 #include "tevent_util.h"
 
+static void tevent_abort(struct tevent_context *ev, const char *reason);
+
 struct tevent_ops_list {
        struct tevent_ops_list *next, *prev;
        const char *name;
@@ -173,6 +178,91 @@ const char **tevent_backend_list(TALLOC_CTX *mem_ctx)
        return list;
 }
 
+#ifdef HAVE_PTHREAD
+
+static pthread_mutex_t tevent_contexts_mutex = PTHREAD_MUTEX_INITIALIZER;
+static struct tevent_context *tevent_contexts = NULL;
+static pthread_once_t tevent_atfork_initialized = PTHREAD_ONCE_INIT;
+
+static void tevent_atfork_prepare(void)
+{
+       struct tevent_context *ev;
+       int ret;
+
+       ret = pthread_mutex_lock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       for (ev = tevent_contexts; ev != NULL; ev = ev->next) {
+               ret = pthread_mutex_lock(&ev->scheduled_mutex);
+               if (ret != 0) {
+                       tevent_abort(ev, "pthread_mutex_lock failed");
+               }
+       }
+}
+
+static void tevent_atfork_parent(void)
+{
+       struct tevent_context *ev;
+       int ret;
+
+       for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
+            ev = DLIST_PREV(ev)) {
+               ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+               if (ret != 0) {
+                       tevent_abort(ev, "pthread_mutex_unlock failed");
+               }
+       }
+
+       ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+}
+
+static void tevent_atfork_child(void)
+{
+       struct tevent_context *ev;
+       int ret;
+
+       for (ev = DLIST_TAIL(tevent_contexts); ev != NULL;
+            ev = DLIST_PREV(ev)) {
+               struct tevent_threaded_context *tctx;
+
+               for (tctx = ev->threaded_contexts; tctx != NULL;
+                    tctx = tctx->next) {
+                       tctx->event_ctx = NULL;
+               }
+
+               ev->threaded_contexts = NULL;
+
+               ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+               if (ret != 0) {
+                       tevent_abort(ev, "pthread_mutex_unlock failed");
+               }
+       }
+
+       ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+}
+
+static void tevent_prep_atfork(void)
+{
+       int ret;
+
+       ret = pthread_atfork(tevent_atfork_prepare,
+                            tevent_atfork_parent,
+                            tevent_atfork_child);
+       if (ret != 0) {
+               abort();
+       }
+}
+
+#endif
+
 int tevent_common_context_destructor(struct tevent_context *ev)
 {
        struct tevent_fd *fd, *fn;
@@ -180,6 +270,33 @@ int tevent_common_context_destructor(struct tevent_context *ev)
        struct tevent_immediate *ie, *in;
        struct tevent_signal *se, *sn;
 
+#ifdef HAVE_PTHREAD
+       int ret;
+
+       ret = pthread_mutex_lock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       DLIST_REMOVE(tevent_contexts, ev);
+
+       ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+#endif
+
+       if (ev->threaded_contexts != NULL) {
+               /*
+                * Threaded contexts are indicators that threads are
+                * about to send us immediates via
+                * tevent_threaded_schedule_immediate. The caller
+                * needs to make sure that the tevent context lives
+                * long enough to receive immediates from all threads.
+                */
+               tevent_abort(ev, "threaded contexts exist");
+       }
+
        if (ev->pipe_fde) {
                talloc_free(ev->pipe_fde);
                close(ev->pipe_fds[0]);
@@ -255,6 +372,36 @@ struct tevent_context *tevent_context_init_ops(TALLOC_CTX *mem_ctx,
        ev = talloc_zero(mem_ctx, struct tevent_context);
        if (!ev) return NULL;
 
+#ifdef HAVE_PTHREAD
+
+       ret = pthread_once(&tevent_atfork_initialized, tevent_prep_atfork);
+       if (ret != 0) {
+               talloc_free(ev);
+               return NULL;
+       }
+
+       ret = pthread_mutex_init(&ev->scheduled_mutex, NULL);
+       if (ret != 0) {
+               talloc_free(ev);
+               return NULL;
+       }
+
+       ret = pthread_mutex_lock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               pthread_mutex_destroy(&ev->scheduled_mutex);
+               talloc_free(ev);
+               return NULL;
+       }
+
+       DLIST_ADD(tevent_contexts, ev);
+
+       ret = pthread_mutex_unlock(&tevent_contexts_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+#endif
+
        talloc_set_destructor(ev, tevent_common_context_destructor);
 
        ev->ops = ops;
index 1c1271b9400cfc5d2d355e41502b0e539d99f609..2432344fab5deddfff9c1254d0433ae5d0e074bf 100644 (file)
@@ -40,6 +40,7 @@ struct tevent_timer;
 struct tevent_immediate;
 struct tevent_signal;
 struct tevent_thread_proxy;
+struct tevent_threaded_context;
 
 /**
  * @defgroup tevent The tevent API
@@ -1750,6 +1751,79 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
                                  tevent_immediate_handler_t handler,
                                  void *pp_private_data);
 
+/*
+ * @brief Create a context for threaded activation of immediates
+ *
+ * A tevent_treaded_context provides a link into an event
+ * context. Using tevent_threaded_schedule_immediate, it is possible
+ * to activate an immediate event from within a thread.
+ *
+ * It is the duty of the caller of tevent_threaded_context_create() to
+ * keep the event context around longer than any
+ * tevent_threaded_context. tevent will abort if ev is talllc_free'ed
+ * with an active tevent_threaded_context.
+ *
+ * If tevent is build without pthread support, this always returns
+ * NULL with errno=ENOSYS.
+ *
+ * @param[in]  mem_ctx  The talloc memory context to use.
+ * @param[in]  ev       The event context to link this to.
+ * @return              The threaded context, or NULL with errno set.
+ *
+ * @see tevent_threaded_schedule_immediate()
+ *
+ * @note Available as of tevent 0.9.30
+ */
+struct tevent_threaded_context *tevent_threaded_context_create(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev);
+
+#ifdef DOXYGEN
+/*
+ * @brief Activate an immediate from a thread
+ *
+ * Activate an immediate from within a thread.
+ *
+ * This routine does not watch out for talloc hierarchies. This means
+ * that it is highly recommended to create the tevent_immediate in the
+ * thread owning tctx, allocate a threaded job description for the
+ * thread, hand over both pointers to a helper thread and not touch it
+ * in the main thread at all anymore.
+ *
+ * tevent_threaded_schedule_immediate is intended as a job completion
+ * indicator for simple threaded helpers.
+ *
+ * Please be aware that tevent_threaded_schedule_immediate is very
+ * picky about its arguments: An immediate may not already be
+ * activated and the handler must exist. With
+ * tevent_threaded_schedule_immediate memory ownership is transferred
+ * to the main thread holding the tevent context behind tctx, the
+ * helper thread can't access it anymore.
+ *
+ * @param[in]  tctx     The threaded context to go through
+ * @param[in]  im       The immediate event to activate
+ * @param[in]  handler  The immediate handler to call in the main thread
+ * @param[in]  private_data Pointer for the immediate handler
+ *
+ * @see tevent_threaded_context_create()
+ *
+ * @note Available as of tevent 0.9.30
+ */
+void tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+                                       struct tevent_immediate *im,
+                                       tevent_immediate_handler_t handler,
+                                       void *private_data);
+#else
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+                                        struct tevent_immediate *im,
+                                        tevent_immediate_handler_t handler,
+                                        void *private_data,
+                                        const char *handler_name,
+                                        const char *location);
+#define tevent_threaded_schedule_immediate(tctx, im, handler, private_data) \
+       _tevent_threaded_schedule_immediate(tctx, im, handler, private_data, \
+                                  #handler, __location__);
+#endif
+
 #ifdef TEVENT_DEPRECATED
 #ifndef _DEPRECATED_
 #ifdef HAVE___ATTRIBUTE__
index 507ea5ca7fde055a01ee0cb2a5649632cf1a64ed..4147c67af2a014ed16b829c7730ac0f1fbb93a34 100644 (file)
@@ -903,6 +903,10 @@ static int epoll_event_loop_once(struct tevent_context *ev, const char *location
                return 0;
        }
 
+       if (ev->threaded_contexts != NULL) {
+               tevent_common_threaded_activate_immediate(ev);
+       }
+
        if (ev->immediate_events &&
            tevent_common_loop_immediate(ev)) {
                return 0;
index 83627705095bcecd4b75bc0fe7d40f877f6af9fc..6b29547427ce36551a629c5bdc386f4295ac8a0e 100644 (file)
@@ -228,6 +228,11 @@ struct tevent_signal {
        void *additional_data;
 };
 
+struct tevent_threaded_context {
+       struct tevent_threaded_context *next, *prev;
+       struct tevent_context *event_ctx;
+};
+
 struct tevent_debug_ops {
        void (*debug)(void *context, enum tevent_debug_level level,
                      const char *fmt, va_list ap) PRINTF_ATTRIBUTE(3,0);
@@ -247,6 +252,13 @@ struct tevent_context {
        /* list of timed events - used by common code */
        struct tevent_timer *timer_events;
 
+       /* List of threaded job indicators */
+       struct tevent_threaded_context *threaded_contexts;
+
+       /* List of scheduled immediates */
+       pthread_mutex_t scheduled_mutex;
+       struct tevent_immediate *scheduled_immediates;
+
        /* list of immediate events - used by common code */
        struct tevent_immediate *immediate_events;
 
@@ -282,6 +294,10 @@ struct tevent_context {
         * tevent_common_add_timer_v2()
         */
        struct tevent_timer *last_zero_timer;
+
+#ifdef HAVE_PTHREAD
+       struct tevent_context *prev, *next;
+#endif
 };
 
 const struct tevent_ops *tevent_find_ops_byname(const char *name);
@@ -327,6 +343,7 @@ void tevent_common_schedule_immediate(struct tevent_immediate *im,
                                      const char *handler_name,
                                      const char *location);
 bool tevent_common_loop_immediate(struct tevent_context *ev);
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev);
 
 bool tevent_common_have_events(struct tevent_context *ev);
 int tevent_common_wakeup_init(struct tevent_context *ev);
index 3547e912b2c07ba3cdabe2fbc5f8376646410c23..09d85fa322adff298eb6a1f5e7c72bccd6121881 100644 (file)
@@ -645,6 +645,10 @@ static int poll_event_loop_once(struct tevent_context *ev,
                return 0;
        }
 
+       if (ev->threaded_contexts != NULL) {
+               tevent_common_threaded_activate_immediate(ev);
+       }
+
        if (ev->immediate_events &&
            tevent_common_loop_immediate(ev)) {
                return 0;
index 4b524dff291d13139732c15062c3e99097b93e48..8cf9fd1a0de7aa9bc6abea8b21212cf2b420041c 100644 (file)
@@ -760,6 +760,10 @@ static int port_event_loop_once(struct tevent_context *ev, const char *location)
                return 0;
        }
 
+       if (ev->threaded_contexts != NULL) {
+               tevent_common_threaded_activate_immediate(ev);
+       }
+
        if (ev->immediate_events &&
            tevent_common_loop_immediate(ev)) {
                return 0;
index ec7565d00c3ece9c071ee29dc490df4fe4ee4b72..55dd0b66f66c52eb8fea3e3ed78e973ca643c6bb 100644 (file)
@@ -244,6 +244,10 @@ static int select_event_loop_once(struct tevent_context *ev, const char *locatio
                return 0;
        }
 
+       if (ev->threaded_contexts != NULL) {
+               tevent_common_threaded_activate_immediate(ev);
+       }
+
        if (ev->immediate_events &&
            tevent_common_loop_immediate(ev)) {
                return 0;
index 22b854c410f82d209f333ffbfd27aea8590a3671..e42759efd83a9064941e40cdfc8dd558db6d679d 100644 (file)
@@ -371,3 +371,122 @@ void tevent_thread_proxy_schedule(struct tevent_thread_proxy *tp,
        ;
 }
 #endif
+
+static int tevent_threaded_context_destructor(
+       struct tevent_threaded_context *tctx)
+{
+       if (tctx->event_ctx != NULL) {
+               DLIST_REMOVE(tctx->event_ctx->threaded_contexts, tctx);
+       }
+       return 0;
+}
+
+struct tevent_threaded_context *tevent_threaded_context_create(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+       struct tevent_threaded_context *tctx;
+       int ret;
+
+       ret = tevent_common_wakeup_init(ev);
+       if (ret != 0) {
+               errno = ret;
+               return NULL;
+       }
+
+       tctx = talloc(mem_ctx, struct tevent_threaded_context);
+       if (tctx == NULL) {
+               return NULL;
+       }
+       tctx->event_ctx = ev;
+
+       DLIST_ADD(ev->threaded_contexts, tctx);
+       talloc_set_destructor(tctx, tevent_threaded_context_destructor);
+
+       return tctx;
+#else
+       errno = ENOSYS;
+       return NULL;
+#endif
+}
+
+void _tevent_threaded_schedule_immediate(struct tevent_threaded_context *tctx,
+                                        struct tevent_immediate *im,
+                                        tevent_immediate_handler_t handler,
+                                        void *private_data,
+                                        const char *handler_name,
+                                        const char *location)
+{
+#ifdef HAVE_PTHREAD
+       struct tevent_context *ev = tctx->event_ctx;
+       int ret;
+
+       if ((im->event_ctx != NULL) || (handler == NULL)) {
+               abort();
+       }
+
+       im->event_ctx           = ev;
+       im->handler             = handler;
+       im->private_data        = private_data;
+       im->handler_name        = handler_name;
+       im->schedule_location   = location;
+       im->cancel_fn           = NULL;
+       im->additional_data     = NULL;
+
+       ret = pthread_mutex_lock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       DLIST_ADD_END(ev->scheduled_immediates, im);
+
+       ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       /*
+        * We might want to wake up the main thread under the lock. We
+        * had a slightly similar situation in pthreadpool, changed
+        * with 1c4284c7395f23. This is not exactly the same, as the
+        * wakeup is only a last-resort thing in case the main thread
+        * is sleeping. Doing the wakeup under the lock can easily
+        * lead to a contended mutex, which is much more expensive
+        * than a noncontended one. So I'd opt for the lower footprint
+        * initially. Maybe we have to change that later.
+        */
+       tevent_common_wakeup(ev);
+#else
+       /*
+        * tevent_threaded_context_create() returned NULL with ENOSYS...
+        */
+       abort();
+#endif
+}
+
+void tevent_common_threaded_activate_immediate(struct tevent_context *ev)
+{
+#ifdef HAVE_PTHREAD
+       int ret;
+       ret = pthread_mutex_lock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+
+       while (ev->scheduled_immediates != NULL) {
+               struct tevent_immediate *im = ev->scheduled_immediates;
+               DLIST_REMOVE(ev->scheduled_immediates, im);
+               DLIST_ADD_END(ev->immediate_events, im);
+       }
+
+       ret = pthread_mutex_unlock(&ev->scheduled_mutex);
+       if (ret != 0) {
+               abort();
+       }
+#else
+       /*
+        * tevent_threaded_context_create() returned NULL with ENOSYS...
+        */
+       abort();
+#endif
+}
index 71b9475c37035ff13c2ec4954b9081b4db8a9f12..3e0b4132e64d10b3f1895b42e8f696ebfe6dedae 100755 (executable)
@@ -99,9 +99,13 @@ def build(bld):
         private_library = True
 
     if not bld.CONFIG_SET('USING_SYSTEM_TEVENT'):
+        tevent_deps = 'replace talloc'
+        if bld.CONFIG_SET('HAVE_PTHREAD'):
+            tevent_deps += ' pthread'
+
         bld.SAMBA_LIBRARY('tevent',
                           SRC,
-                          deps='replace talloc',
+                          deps=tevent_deps,
                           enabled= not bld.CONFIG_SET('USING_SYSTEM_TEVENT'),
                           includes='.',
                           abi_directory='ABI',