bla
authorStefan Metzmacher <metze@samba.org>
Thu, 28 Jul 2016 16:41:54 +0000 (18:41 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:50 +0000 (09:51 +0200)
lib/tevent/tevent.h
lib/tevent/tevent_internal.h
lib/tevent/tevent_threads.c

index 02c1918950b37115fa900f47e7c06d0ce456aee0..2825ec79b569ae07b4b31e8f71cc0e4dc8b99760 100644 (file)
@@ -1976,6 +1976,7 @@ struct tevent_threadpool;
  * requests in a helper threadpool.
  */
 
+#ifdef DOXYGEN
 /**
  * @brief Create a tevent_threadpool
  *
@@ -1993,6 +1994,16 @@ struct tevent_threadpool;
 struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
                                                   struct tevent_context *ev,
                                                   int max_threads);
+#else
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+                                                   struct tevent_context *ev,
+                                                   int max_threads,
+                                                   const char *location);
+#define tevent_threadpool_create(mem_ctx, ev, max_threads) \
+       _tevent_threadpool_job_create((mem_ctx), (ev), (max_threads), \
+                                     __location__)
+
+#endif
 
 /**
  * @brief Destroy a tevent_threadpool
@@ -2022,9 +2033,7 @@ int tevent_threadpool_destroy(struct tevent_threadpool *pool);
  * @param[in] type      The name of the job.
  */
 struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                          #type,
-                                                          int (*fn)(type *state),
-                                                          struct tevent_threadpool_job **pjob,
+                                                          #name,
                                                           void **pstate);
 #else
 struct tevent_threadpool_job_description {
@@ -2034,59 +2043,34 @@ struct tevent_threadpool_job_description {
 };
 
 #define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__name) \
-static const union { \
-       struct tevent_threadpool_job_description generic; \
-       struct { \
-               const char *type; \
-               size_t type_size; \
-               int (*fn)(struct __name##_arg *arg); \
-       } specific; \
-} __name##_description = { \
-       .specific = { \
-               .type = "struct " #__name "_arg", \
-               .type_size = sizeof(struct __name##_arg), \
-               .fn = __name##_fn, \
-       }, \
-};
-#define TEVENT_THREADPOOL_JOB_PTR(fn) \
-       (&tevent_threadpool_job##fn.generic)
+static const struct tevent_threadpool_job_description *__name##_description(void) \
+{ \
+       static const union { \
+               struct tevent_threadpool_job_description generic; \
+               struct { \
+                       const char *type; \
+                       size_t type_size; \
+                       int (*fn)(struct __name##_args *args); \
+               } specific; \
+       } description = { \
+               .specific = { \
+                       .type = "struct " #__name "_args", \
+                       .type_size = sizeof(struct __name##_args), \
+                       .fn = __name##_fn, \
+               }, \
+       }; \
+ \
+       return &description.generic; \
+}
 
 struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
                                                             const struct tevent_threadpool_job_description *desc,
                                                             void *pstate,
                                                             const char *location);
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                           int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
-                                                           void *pstate,
-                                                           size_t state_size,
-                                                           const char *type,
-                                                           const char *location);
-
-#if (__GNUC__ >= 3)
-#if 0
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) ( \
-       int (*__function_ptr)(type *state) = (fn), \
-       _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
-                                     sizeof(type), #type, __location))
-#define __tevent_threadpool_job_create(mem_ctx, type, fn, pjob, pstate) do { \
-       int (*__function_ptr)(type *) = (fn); \
-       *pjob = _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
-                                     sizeof(type), #type, __location__); \
 
-#endif
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
-       _tevent_threadpool_job_create((mem_ctx), \
-               ( (\
-                       ((union { \
-                               int (*generic_fn)(void *state); \
-                               int (*specific_fn)(type *state); \
-                       }) { .specific_fn = fn, })),\
-               (pstate), sizeof(type), #type, __location__)
-#else
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
-       _tevent_threadpool_job_create((mem_ctx), (int *)(void *)(fn), (pstate), \
-                                     sizeof(type), #type, __location__)
-#endif
+#define tevent_threadpool_job_create(mem_ctx, name, pstate) \
+       _tevent_threadpool_job_create((mem_ctx), __name##_description(), \
+                                     (pstate), __location__)
 
 #endif
 
index 1ada1b5da558ae27019fb6c1a3776345680e330d..c35fd646d2511b29144c8f9f9c7df841bde4dfb7 100644 (file)
@@ -330,6 +330,10 @@ struct tevent_context {
                struct tevent_wrapper_glue *glue;
        } wrapper;
 
+       struct {
+               struct tevent_threadpool *pools;
+       } threads;
+
        /*
         * an optimization pointer into timer_events
         * used by used by common code via
index 5c45615bca63673faaa403a8aae1977c3a9341bd..7cc57c1e7d8d90178f0eed9bfcd1a130015b4c5f 100644 (file)
@@ -990,9 +990,10 @@ struct tevent_threadpool_job {
 };
 
 struct tevent_threadpool {
+       struct tevent_threadpool *prev, *next;
        struct tevent_context *ev;
 
-       size_t max_jobs;
+       size_t max_threads;
        struct tevent_threadpool_job *jobs;
        struct tevent_queue *job_queue;
 
@@ -1004,6 +1005,11 @@ static void tevent_threadpool_handler(struct tevent_context *ev,
 
 static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
 {
+       tevent_queue_stop(pool->job_queue);
+
+       //TODO DLIST_REMOVE all pool->jobs
+
+#if 0
        printf("tevent_threadpool_destructor\n");
        while (talloc_array_length(pool->pending) != 0) {
                /* No TALLOC_FREE here */
@@ -1020,38 +1026,48 @@ static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
 
        threadpool_destroy(pool->threadpool);
        pool->threadpool = NULL;
-
+#endif
        return 0;
 }
 
-struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
-                                                  struct tevent_context *ev,
-                                                  int max_threads)
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+                                                   struct tevent_context *ev,
+                                                   int max_threads,
+                                                   const char *location)
 {
        struct tevent_threadpool *pool;
 
-       pool = talloc_zero(mem_ctx, struct tevent_threadpool);
-       if (pool == NULL) {
+       if (ev->wrapper.glue != NULL) {
+               /*
+                * stacking of wrappers is not supported
+                */
+               tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
+                            "%s: %s() stacking not allowed",
+                            __func__, location);
+               errno = ENOSYS;
                return NULL;
        }
-       pool->ev = ev;
 
-       pool->threadpool = threadpool_init(pool, max_threads);
-       if (pool->threadpool == NULL) {
-               TALLOC_FREE(pool);
+       pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+       if (pool == NULL) {
                return NULL;
        }
-       talloc_set_destructor(pool, tevent_threadpool_destructor);
+       *pool = (struct tevent_threadpool) {
+               .ev = ev,
+               .max_threads = max_threads,
+       };
 
-       pool->sig_fd = threadpool_signal_fd(pool->threadpool);
-       if (pool->sig_fd == -1) {
+       pool->job_queue = tevent_queue_create(pool, "job_queue");
+       if (pool->job_queue == NULL) {
                TALLOC_FREE(pool);
                return NULL;
        }
 
+       DLIST_ADD_END(ev->threads.pools, pool);
+
        return pool;
 }
-
+#if 0
 static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
 {
        int num_pending = talloc_array_length(pool->pending);
@@ -1184,7 +1200,7 @@ static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
        pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
                                      num_pending - 1);
 }
-
+#endif
 static void tevent_threadpool_req_cleanup(struct tevent_req *req,
                                          enum tevent_req_state req_state)
 {
@@ -1192,23 +1208,23 @@ static void tevent_threadpool_req_cleanup(struct tevent_req *req,
                req, struct tevent_threadpool_state);
        //struct tevent_threadpool *pool = state->pool;
 
-       printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+       //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
 
        switch (req_state) {
        case TEVENT_REQ_RECEIVED:
                break;
        default:
-               printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+       //      printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
                return;
        }
 
        if (state->done) {
-               printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
                tevent_threadpool_req_unset_pending(req);
                return;
        }
 
-       printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
 #if 0
        threadpool_job_cancel(state->job);
 
@@ -1284,7 +1300,7 @@ struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
        return NULL;
 }
 
-struct metze_thread_state {
+struct metze_thread_args {
        uint8_t tmp;
 };
 
@@ -1293,7 +1309,7 @@ static int metze_thread_fn(struct metze_thread_state *state)
        return 0;
 }
 
-TEVENT_THREADPOOL_JOB_DECL(metze_thread_fn, struct metze_thread_state);
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
 
 struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
                                              struct tevent_context *ev,