next
authorStefan Metzmacher <metze@samba.org>
Thu, 28 Jul 2016 15:41:49 +0000 (17:41 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:50 +0000 (09:51 +0200)
lib/tevent/tevent.h
lib/tevent/tevent_threads.c

index 45e5bb9e4380b8091c2feb5d5695282cc70cce3e..02c1918950b37115fa900f47e7c06d0ce456aee0 100644 (file)
@@ -97,11 +97,6 @@ typedef void (*tevent_signal_handler_t)(struct tevent_context *ev,
                                        void *siginfo,
                                        void *private_data);
 
-/**
- * Called when a threaded job is schedule to run in a thread
- */
-typedef void (*tevent_threadpool_fn_t)(void *private_data);
-
 /**
  * @brief Create a event_context structure.
  *
@@ -2014,24 +2009,85 @@ int tevent_threadpool_destroy(struct tevent_threadpool *pool);
 /**
  * @brief Create a job to run in a threadpool
  *
+ * Only one instance of the job can run at a time,
+ * but the job can be reused multiple times.
+ *
+  If you want to run multiple instances of similar jobs
+ * at the same time, you need to create a tevent_threadpool_job
+ * for each instance.
+ *
  * @param[in] mem_ctx   The memory context for the result.
- * @param[in] pstate    Pointer to the private request state.
+ * @param[in] fn        The function to execute the job (must be thread safe!)
+ * @param[in] pstate    Pointer to the request state.
  * @param[in] type      The name of the job.
  */
 struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                          tevent_threadpool_fn_t fn,
-                                                          void **pstate, #type);
+                                                          #type,
+                                                          int (*fn)(type *state),
+                                                          struct tevent_threadpool_job **pjob,
+                                                          void **pstate);
 #else
+struct tevent_threadpool_job_description {
+       const char *type;
+       size_t type_size;
+       int (*fn)(void *arg);
+};
+
+#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__name) \
+static const union { \
+       struct tevent_threadpool_job_description generic; \
+       struct { \
+               const char *type; \
+               size_t type_size; \
+               int (*fn)(struct __name##_arg *arg); \
+       } specific; \
+} __name##_description = { \
+       .specific = { \
+               .type = "struct " #__name "_arg", \
+               .type_size = sizeof(struct __name##_arg), \
+               .fn = __name##_fn, \
+       }, \
+};
+#define TEVENT_THREADPOOL_JOB_PTR(fn) \
+       (&tevent_threadpool_job##fn.generic)
+
+struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
+                                                            const struct tevent_threadpool_job_description *desc,
+                                                            void *pstate,
+                                                            const char *location);
 struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                           tevent_threadpool_fn_t fn,
+                                                           int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
                                                            void *pstate,
                                                            size_t state_size,
                                                            const char *type,
                                                            const char *location);
 
+#if (__GNUC__ >= 3)
+#if 0
+#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) ( \
+       int (*__function_ptr)(type *state) = (fn), \
+       _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
+                                     sizeof(type), #type, __location))
+#define __tevent_threadpool_job_create(mem_ctx, type, fn, pjob, pstate) do { \
+       int (*__function_ptr)(type *) = (fn); \
+       *pjob = _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
+                                     sizeof(type), #type, __location__); \
+
+#endif
+#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
+       _tevent_threadpool_job_create((mem_ctx), \
+               ( (\
+                       ((union { \
+                               int (*generic_fn)(void *state); \
+                               int (*specific_fn)(type *state); \
+                       }) { .specific_fn = fn, })),\
+               (pstate), sizeof(type), #type, __location__)
+#else
 #define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
-       _tevent_threadpool_job_create((mem_ctx), (fn), (pstate), \
-                                     sizeof(type), #type, __location)
+       _tevent_threadpool_job_create((mem_ctx), (int *)(void *)(fn), (pstate), \
+                                     sizeof(type), #type, __location__)
+#endif
+
 #endif
 
 /**
@@ -2043,10 +2099,10 @@ struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
  * @param[in]  job      The job to run
  * @return              tevent request on sucess, NULL on failure
  */
-struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
-                                         struct tevent_context *ev,
-                                         struct tevent_threadpool *pool,
-                                         struct tevent_threadpool_job *job);
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev,
+                                             struct tevent_threadpool *pool,
+                                             struct tevent_threadpool_job *job);
 
 /**
  * @brief Get the result of a computation
@@ -2055,7 +2111,7 @@ struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
  * @param[out] perror          errno in case of failure
  * @return                     0 on sucess or -1 if an error occured
  */
-int tevent_threadpool_recv(struct tevent_req *req, int *perror);
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perror);
 
 /**
  * @defgroup tevent_wrapper_ops The tevent wrapper operation functions
index 43d5a3e457c81847d1d209c0fd7196b154bc7b7d..5c45615bca63673faaa403a8aae1977c3a9341bd 100644 (file)
@@ -441,6 +441,7 @@ static pthread_once_t threadpool_atfork_initialized = PTHREAD_ONCE_INIT;
 
 static void threadpool_prep_atfork(void);
 
+#if 0
 /*
  * Initialize a thread pool
  */
@@ -589,7 +590,6 @@ static void threadpool_job_cancel(struct threadpool_job *job)
 {
        job->cancel = true;
 }
-
 static bool threadpool_job_is_cancelled(struct threadpool_job *job)
 {
        return job->cancel;
@@ -966,34 +966,36 @@ static struct threadpool_job *threadpool_add_job(TALLOC_CTX *mem_ctx,
        return job;
 }
 
+#endif
 /*******************************************************************
  * tvent_threadpool_send()/recv() sugar
  *******************************************************************/
 
 struct tevent_threadpool_job {
-       tevent_threadpool_fn_t fn;
-       void *private_data;
-};
-
-struct tevent_threadpool_state {
+       struct tevent_threadpool_job *prev, *next;
        struct tevent_threadpool *pool;
-       int job_id;
-       bool done;
-       struct tevent_threadpool_job *job;
-       struct threadpool_job *threadpool_job;
+
+       struct {
+               struct tevent_immediate *im;
+               bool busy;
+               bool done;
+               int ret;
+               //bool cancel;
+       } internal;
+
+       struct {
+               const struct tevent_threadpool_job_description *desc;
+               void *arg;
+       } state;
 };
 
 struct tevent_threadpool {
        struct tevent_context *ev;
-       struct threadpool *threadpool;
-       int next_job_id;
-       int sig_fd;
-       struct tevent_req **pending;
 
-       struct tevent_threadpool_state **orphaned;
-       int num_orphaned;
+       size_t max_jobs;
+       struct tevent_threadpool_job *jobs;
+       struct tevent_queue *job_queue;
 
-       struct tevent_fd *fde;
 };
 
 static void tevent_threadpool_handler(struct tevent_context *ev,
@@ -1188,7 +1190,7 @@ static void tevent_threadpool_req_cleanup(struct tevent_req *req,
 {
        struct tevent_threadpool_state *state = tevent_req_data(
                req, struct tevent_threadpool_state);
-       struct tevent_threadpool *pool = state->pool;
+       //struct tevent_threadpool *pool = state->pool;
 
        printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
 
@@ -1207,7 +1209,7 @@ static void tevent_threadpool_req_cleanup(struct tevent_req *req,
        }
 
        printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-
+#if 0
        threadpool_job_cancel(state->job);
 
        /*
@@ -1219,51 +1221,97 @@ static void tevent_threadpool_req_cleanup(struct tevent_req *req,
        tevent_threadpool_req_unset_pending(req);
        pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
                                                             &req->data);
+#endif
 }
 
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
-                                                           tevent_threadpool_fn_t fn,
-                                                           void *pdata,
-                                                           size_t data_size,
-                                                           const char *type,
-                                                           const char *location)
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
+{
+       if (state->pool == NULL) {
+               return 0;
+       }
+
+       DLIST_REMOVE(job->pool, job);
+       job->pool = NULL;
+       return 0;
+}
+
+struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
+                                                            const struct tevent_threadpool_job_description *desc,
+                                                            void *pstate,
+                                                            const char *location)
 {
        struct tevent_threadpool_job *job;
-       void **ppdata = (void **)pdata;
-       void *data;
+       void **ppstate = (void **)pstate;
+       void *state;
+       size_t payload;
 
-       job = talloc_pooled_object(mem_ctx, tevent_threadpool_job, 1,
-                                  data_size);
-       if (job == NULL) {
+       payload = sizeof(struct tevent_immediate) + desc->type_size;
+       if (payload < sizeof(struct tevent_immediate)) {
+               /* overflow */
                return NULL;
        }
-       ZERO_STRUCTP(job);
 
-       job->fn = fn;
-
-       data = talloc_zero_size(job, data_size);
-       if (data == NULL) {
-               talloc_free(job);
+       job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
+                                  2, payload);
+       if (job == NULL) {
                return NULL;
        }
-       talloc_set_name_const(job, type);
 
-       job->data = data;
+       *job = (struct tevent_threadpool_job) {
+               .internal = {
+                       .im = tevent_create_immediate(job),
+               },
+               .state = {
+                       .desc = desc,
+                       .arg = talloc_zero_size(job, desc->type_size),
+               },
+       };
+       talloc_set_name_const(job->state.arg, desc->type);
 
-       talloc_set_destructor(job, tevent_threaded_job_destructor);
+       talloc_set_destructor(job, tevent_threadpool_job_destructor);
 
-       *ppdata = data;
+       *ppstate = state;
        return job;
 }
 
-struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
-                                         struct tevent_context *ev,
-                                         struct tevent_threadpool *pool,
-                                         struct tevent_threadpool_job *job)
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
+                                                           int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
+                                                           void *pstate,
+                                                           size_t state_size,
+                                                           const char *type,
+                                                           const char *location)
+{
+       return NULL;
+}
+
+struct metze_thread_state {
+       uint8_t tmp;
+};
+
+static int metze_thread_fn(struct metze_thread_state *state)
+{
+       return 0;
+}
+
+TEVENT_THREADPOOL_JOB_DECL(metze_thread_fn, struct metze_thread_state);
+
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+                                             struct tevent_context *ev,
+                                             struct tevent_threadpool *pool,
+                                             struct tevent_threadpool_job *job)
 {
        struct tevent_req *req;
        struct tevent_threadpool_state *state;
+#if 0
+struct tevent_threadpool_job *j;
+struct metze_thread_state *jstate;
+
+       j = tevent_threadpool_job_create(mem_ctx, metze_thread_fn,
+                                    &jstate, struct metze_thread_state);
+       if (j == NULL) {
 
+       }
+#endif
        req = tevent_req_create(mem_ctx, &state, struct tevent_threadpool_state);
        if (req == NULL) {
                return NULL;
@@ -1275,11 +1323,12 @@ struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
 
        printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
 
+
        state->threadpool_job = threadpool_add_job(state,
                                                   state->pool->threadpool,
                                                   state->job_id,
                                                   state->job->fn,
-                                                  state->job->data);
+                                                  state->job->state);
        if (state->threadpool_job == NULL) {
                printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
                tevent_req_error(req, errno);
@@ -1355,7 +1404,7 @@ static void tevent_threadpool_handler(struct tevent_context *ev,
        TALLOC_FREE(orphaned_state);
 }
 
-int tevent_threadpool_recv(struct tevent_req *req, int *perrno)
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
 {
        enum tevent_req_state req_state;
        uint64_t error;