more
authorStefan Metzmacher <metze@samba.org>
Thu, 28 Jul 2016 17:34:38 +0000 (19:34 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:51 +0000 (09:51 +0200)
lib/tevent/tevent.h
lib/tevent/tevent_metze.c

index 996991a5d20c2dda54c09d0857187efae1537632..901cd0e96c90ddd63b112e01eee0b3f20d79fcf5 100644 (file)
@@ -2000,8 +2000,8 @@ struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
                                                    int max_threads,
                                                    const char *location);
 #define tevent_threadpool_create(mem_ctx, ev, max_threads) \
-       _tevent_threadpool_job_create((mem_ctx), (ev), (max_threads), \
-                                     __location__)
+       _tevent_threadpool_create((mem_ctx), (ev), (max_threads), \
+                                 __location__)
 
 #endif
 
@@ -2037,26 +2037,26 @@ struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
                                                           void **pstate);
 #else
 struct tevent_threadpool_job_description {
-       const char *type;
-       size_t type_size;
-       int (*fn)(void *arg);
+       int (*fn)(void *args);
+       const char *args_type;
+       size_t args_size;
 };
 
-#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__name) \
-static const struct tevent_threadpool_job_description *__name##_description(void) \
+#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__fn) \
+static const struct tevent_threadpool_job_description *__fn##_description(void) \
 { \
        static const union { \
                struct tevent_threadpool_job_description generic; \
                struct { \
-                       const char *type; \
-                       size_t type_size; \
-                       int (*fn)(struct __name##_args *args); \
+                       int (*fn)(struct __fn##_args *args); \
+                       const char *args_type; \
+                       size_t args_size; \
                } specific; \
        } description = { \
                .specific = { \
-                       .type = "struct " #__name "_args", \
-                       .type_size = sizeof(struct __name##_args), \
-                       .fn = __name##_fn, \
+                       .fn = __fn, \
+                       .args_type = "struct " #__fn "_args", \
+                       .args_size = sizeof(struct __fn##_args), \
                }, \
        }; \
  \
@@ -2065,12 +2065,12 @@ static const struct tevent_threadpool_job_description *__name##_description(void
 
 struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
                                                            const struct tevent_threadpool_job_description *desc,
-                                                           void *pstate,
+                                                           void *pargs,
                                                            const char *location);
 
-#define tevent_threadpool_job_create(mem_ctx, name, pstate) \
-       _tevent_threadpool_job_create((mem_ctx), __name##_description(), \
-                                     (pstate), __location__)
+#define tevent_threadpool_job_create(mem_ctx, __fn, pargs) \
+       _tevent_threadpool_job_create((mem_ctx), __fn##_description(), \
+                                     (pargs), __location__)
 
 #endif
 
index 811036dcd4dc6605329964b76fbafcdddae205c6..8da6dc7787b7bad7a79adfa78267ce5f05a269c2 100644 (file)
@@ -1,7 +1,7 @@
 /*
    tevent event library.
 
-   Copyright (C) Jeremy Allison 2015
+   Copyright (C) Stefan Metzmacher 2016
 
      ** NOTE! The following LGPL license applies to the tevent
      ** library. This does NOT imply that all of Samba is released
@@ -35,19 +35,21 @@ struct tevent_threadpool;
 
 struct tevent_threadpool_job {
        struct tevent_threadpool_job *prev, *next;
-       struct tevent_threadpool *pool;
+
+       struct {
+               struct tevent_threadpool *pool;
+               struct tevent_req *req;
+       } busy;
 
        struct {
                struct tevent_immediate *im;
-               bool busy;
-               bool done;
                int ret;
                //bool cancel;
        } internal;
 
        struct {
                const struct tevent_threadpool_job_description *desc;
-               void *arg;
+               void *args;
        } state;
 };
 
@@ -61,34 +63,12 @@ struct tevent_threadpool {
 
 };
 
-static void tevent_threadpool_handler(struct tevent_context *ev,
-                                     struct tevent_fd *fde,
-                                     uint16_t flags, void *private_data);
-
 static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
 {
        tevent_queue_stop(pool->job_queue);
        
        //TODO DLIST_REMOVE all pool->jobs
 
-#if 0
-       printf("tevent_threadpool_destructor\n");
-       while (talloc_array_length(pool->pending) != 0) {
-               /* No TALLOC_FREE here */
-               talloc_free(pool->pending[0]);
-       }
-
-       while (pool->num_orphaned != 0) {
-               /*
-                * We've got jobs in the queue for which the tevent_req has
-                * been finished already. Wait for all of them to finish.
-                */
-               tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
-       }
-
-       threadpool_destroy(pool->threadpool);
-       pool->threadpool = NULL;
-#endif
        return 0;
 }
 
@@ -127,203 +107,35 @@ struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
 
        DLIST_ADD_END(ev->threads.pools, pool);
 
+       talloc_set_destructor(pool, tevent_threadpool_destructor);
        return pool;
 }
-#if 0
-static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
-{
-       int num_pending = talloc_array_length(pool->pending);
-       int result;
-
-       while (true) {
-               int i;
-
-               result = pool->next_job_id++;
-               if (result == 0) {
-                       continue;
-               }
-
-               for (i = 0; i < num_pending; i++) {
-                       struct tevent_threadpool_state *state = tevent_req_data(
-                               pool->pending[i], struct tevent_threadpool_state);
-
-                       if (result == state->job_id) {
-                               break;
-                       }
-               }
-               if (i == num_pending) {
-                       return result;
-               }
-       }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
-                                         enum tevent_req_state req_state);
 
-static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
-                                             struct tevent_threadpool *pool,
-                                             struct tevent_context *ev)
-{
-       struct tevent_req **pending;
-       int num_pending, orphaned_array_length;
-
-       num_pending = talloc_array_length(pool->pending);
-       printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
-               num_pending);
-       pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
-                                num_pending + 1);
-       if (pending == NULL) {
-               return false;
-       }
-       pending[num_pending] = req;
-       num_pending++;
-       pool->pending = pending;
-       tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
-
-       /*
-        * Make sure that the orphaned array of
-        * tevent_threadpool_state structs has enough space. A job can
-        * change from pending to orphaned in
-        * tevent_threadpool_cleanup, and to fail in a talloc
-        * destructor should be avoided if possible.
-        */
-
-       orphaned_array_length = talloc_array_length(pool->orphaned);
-       if (num_pending > orphaned_array_length) {
-               struct tevent_threadpool_state **orphaned;
-
-               orphaned = talloc_realloc(pool, pool->orphaned,
-                                         struct tevent_threadpool_state *,
-                                         orphaned_array_length + 1);
-               if (orphaned == NULL) {
-                       tevent_threadpool_req_unset_pending(req);
-                       return false;
-               }
-               pool->orphaned = orphaned;
-       }
-
-       if (pool->fde != NULL) {
-               return true;
-       }
-
-       pool->fde = tevent_add_fd(pool->ev,
-                                 pool,
-                                 pool->sig_fd, TEVENT_FD_READ,
-                                 tevent_threadpool_handler, pool);
-       if (pool->fde == NULL) {
-               tevent_threadpool_req_unset_pending(req);
-               return false;
-       }
-       return true;
-}
-
-static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
-{
-       struct tevent_threadpool *pool = state->pool;
-       int num_pending = talloc_array_length(pool->pending);
-
-       if (num_pending == 0 && pool->num_orphaned == 0) {
-               printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
-               TALLOC_FREE(pool->fde);
-       }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
-{
-       struct tevent_threadpool_state *state = tevent_req_data(
-               req, struct tevent_threadpool_state);
-       struct tevent_threadpool *pool = state->pool;
-       int num_pending = talloc_array_length(pool->pending);
-       int i;
-
-       tevent_req_set_cleanup_fn(req, NULL);
-
-       if (num_pending == 1) {
-               if (pool->num_orphaned == 0) {
-                       printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
-                       TALLOC_FREE(pool->fde);
-               }
-               TALLOC_FREE(pool->pending);
-               return;
-       }
-
-       for (i = 0; i < num_pending; i++) {
-               if (req == pool->pending[i]) {
-                       break;
-               }
-       }
-       if (i == num_pending) {
-               return;
-       }
-       if (num_pending > 1) {
-               pool->pending[i] = pool->pending[num_pending - 1];
-       }
-       pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
-                                     num_pending - 1);
-}
-#endif
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
-                                         enum tevent_req_state req_state)
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
 {
-#if 0
-       struct tevent_threadpool_state *state = tevent_req_data(
-               req, struct tevent_threadpool_state);
-       //struct tevent_threadpool *pool = state->pool;
-
-       //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
-
-       switch (req_state) {
-       case TEVENT_REQ_RECEIVED:
-               break;
-       default:
-       //      printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
-               return;
+       if (job->busy.req != NULL) {
+               tevent_req_received(job->busy.req);
+               job->busy.req = NULL;
        }
 
-       if (state->done) {
-       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
-               tevent_threadpool_req_unset_pending(req);
-               return;
+       if (job->busy.pool != NULL) {
+               DLIST_REMOVE(job->busy.pool->jobs, job);
+               job->busy.pool = NULL;
        }
 
-       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-       threadpool_job_cancel(state->job);
-
-       /*
-        * Keep around the state of the deleted request until the
-        * request has finished in the helper
-        * thread. tevent_threadpool_handler will destroy it.
-        */
-       pool->num_orphaned++;
-       tevent_threadpool_req_unset_pending(req);
-       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
-                                                            &req->data);
-#endif
-}
-
-static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
-{
-       if (job->pool == NULL) {
-               return 0;
-       }
-
-       //DLIST_REMOVE(job->pool, job);
-       job->pool = NULL;
        return 0;
 }
 
 struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
                                                            const struct tevent_threadpool_job_description *desc,
-                                                           void *pstate,
+                                                           void *pargs,
                                                            const char *location)
 {
        struct tevent_threadpool_job *job;
-       void **ppstate = (void **)pstate;
-       void *state;
+       void **ppargs = (void **)pargs;
        size_t payload;
 
-       payload = sizeof(struct tevent_immediate) + desc->type_size;
+       payload = sizeof(struct tevent_immediate) + desc->args_size;
        if (payload < sizeof(struct tevent_immediate)) {
                /* overflow */
                return NULL;
@@ -341,27 +153,28 @@ struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
                },
                .state = {
                        .desc = desc,
-                       .arg = talloc_zero_size(job, desc->type_size),
+                       .args = talloc_zero_size(job, desc->args_size),
                },
        };
-       talloc_set_name_const(job->state.arg, desc->type);
+       talloc_set_name_const(job->state.args, desc->args_type);
 
        talloc_set_destructor(job, tevent_threadpool_job_destructor);
 
-       *ppstate = state;
+       *ppargs = job->state.args;
        return job;
 }
 
-struct metze_thread_args {
+
+struct metze_thread_fn_args {
        uint8_t tmp;
 };
 
-static int metze_thread_fn(struct metze_thread_args *args)
+static int metze_thread_fn(struct metze_thread_fn_args *args)
 {
        return 0;
 }
 
-TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread_fn)
 
 struct tevent_threadpool_job_state {
        struct tevent_threadpool_job *job;
@@ -373,6 +186,11 @@ static void tevent_threadpool_job_cleanup(struct tevent_req *req,
        struct tevent_threadpool_job_state *state =
                tevent_req_data(req,
                struct tevent_threadpool_job_state);
+       struct tevent_threadpool_job *job = state->job;
+
+       if (job == NULL) {
+               return;
+       }
 
        switch (req_state) {
        case TEVENT_REQ_RECEIVED:
@@ -381,27 +199,11 @@ static void tevent_threadpool_job_cleanup(struct tevent_req *req,
                return;
        }
 
+       state->job = NULL;
 
-       if (state->done) {
-       //      printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
-               tevent_threadpool_req_unset_pending(req);
-               return;
-       }
-
-       //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-       threadpool_job_cancel(state->job);
-
-       /*
-        * Keep around the state of the deleted request until the
-        * request has finished in the helper
-        * thread. tevent_threadpool_handler will destroy it.
-        */
-       pool->num_orphaned++;
-       tevent_threadpool_req_unset_pending(req);
-       pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
-                                                            &req->data);
-#endif
+       job->busy.req = NULL;
 }
+
 struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
                                              struct tevent_context *ev,
                                              struct tevent_threadpool *pool,
@@ -409,6 +211,8 @@ struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
 {
        struct tevent_req *req;
        struct tevent_threadpool_job_state *state;
+       struct tevent_threadpool_job *j;
+       struct metze_thread_fn_args *args;
 
        req = tevent_req_create(mem_ctx, &state,
                                struct tevent_threadpool_job_state);
@@ -417,14 +221,18 @@ struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
        }
        state->job = job;
 
+       tevent_req_set_cleanup_fn(req, tevent_threadpool_job_cleanup);
+
+       j = tevent_threadpool_job_create(state, metze_thread_fn, &args);
+       if (tevent_req_nomem(j, req)) {
+               return tevent_req_post(req, ev);
+       }
+
        return req;
 }
 
 int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
 {
-       struct tevent_threadpool_job_state *state =
-               tevent_req_data(req,
-               struct tevent_threadpool_job_state);
        enum tevent_req_state req_state;
        uint64_t error;