void *siginfo,
void *private_data);
-/**
- * Called when a threaded job is schedule to run in a thread
- */
-typedef void (*tevent_threadpool_fn_t)(void *private_data);
-
/**
* @brief Create a event_context structure.
*
/**
* @brief Create a job to run in a threadpool
*
+ * Only one instance of the job can run at a time,
+ * but the job can be reused multiple times.
+ *
+ If you want to run multiple instances of similar jobs
+ * at the same time, you need to create a tevent_threadpool_job
+ * for each instance.
+ *
* @param[in] mem_ctx The memory context for the result.
- * @param[in] pstate Pointer to the private request state.
+ * @param[in] fn The function to execute the job (must be thread safe!)
+ * @param[in] pstate Pointer to the request state.
* @param[in] type The name of the job.
*/
struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- tevent_threadpool_fn_t fn,
- void **pstate, #type);
+ #type,
+ int (*fn)(type *state),
+ struct tevent_threadpool_job **pjob,
+ void **pstate);
#else
+struct tevent_threadpool_job_description {
+ const char *type;
+ size_t type_size;
+ int (*fn)(void *arg);
+};
+
+#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__name) \
+static const union { \
+ struct tevent_threadpool_job_description generic; \
+ struct { \
+ const char *type; \
+ size_t type_size; \
+ int (*fn)(struct __name##_arg *arg); \
+ } specific; \
+} __name##_description = { \
+ .specific = { \
+ .type = "struct " #__name "_arg", \
+ .type_size = sizeof(struct __name##_arg), \
+ .fn = __name##_fn, \
+ }, \
+};
+#define TEVENT_THREADPOOL_JOB_PTR(fn) \
+ (&tevent_threadpool_job##fn.generic)
+
+struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
+ const struct tevent_threadpool_job_description *desc,
+ void *pstate,
+ const char *location);
struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- tevent_threadpool_fn_t fn,
+ int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
void *pstate,
size_t state_size,
const char *type,
const char *location);
+#if (__GNUC__ >= 3)
+#if 0
+#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) ( \
+ int (*__function_ptr)(type *state) = (fn), \
+ _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
+ sizeof(type), #type, __location))
+#define __tevent_threadpool_job_create(mem_ctx, type, fn, pjob, pstate) do { \
+ int (*__function_ptr)(type *) = (fn); \
+ *pjob = _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
+ sizeof(type), #type, __location__); \
+
+#endif
+#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
+ _tevent_threadpool_job_create((mem_ctx), \
+ ( (\
+ ((union { \
+ int (*generic_fn)(void *state); \
+ int (*specific_fn)(type *state); \
+ }) { .specific_fn = fn, })),\
+ (pstate), sizeof(type), #type, __location__)
+#else
#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
- _tevent_threadpool_job_create((mem_ctx), (fn), (pstate), \
- sizeof(type), #type, __location)
+ _tevent_threadpool_job_create((mem_ctx), (int *)(void *)(fn), (pstate), \
+ sizeof(type), #type, __location__)
+#endif
+
#endif
/**
* @param[in] job The job to run
* @return tevent request on sucess, NULL on failure
*/
-struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct tevent_threadpool *pool,
- struct tevent_threadpool_job *job);
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tevent_threadpool *pool,
+ struct tevent_threadpool_job *job);
/**
* @brief Get the result of a computation
* @param[out] perror errno in case of failure
* @return 0 on sucess or -1 if an error occured
*/
-int tevent_threadpool_recv(struct tevent_req *req, int *perror);
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perror);
/**
* @defgroup tevent_wrapper_ops The tevent wrapper operation functions
static void threadpool_prep_atfork(void);
+#if 0
/*
* Initialize a thread pool
*/
{
job->cancel = true;
}
-
static bool threadpool_job_is_cancelled(struct threadpool_job *job)
{
return job->cancel;
return job;
}
+#endif
/*******************************************************************
* tvent_threadpool_send()/recv() sugar
*******************************************************************/
struct tevent_threadpool_job {
- tevent_threadpool_fn_t fn;
- void *private_data;
-};
-
-struct tevent_threadpool_state {
+ struct tevent_threadpool_job *prev, *next;
struct tevent_threadpool *pool;
- int job_id;
- bool done;
- struct tevent_threadpool_job *job;
- struct threadpool_job *threadpool_job;
+
+ struct {
+ struct tevent_immediate *im;
+ bool busy;
+ bool done;
+ int ret;
+ //bool cancel;
+ } internal;
+
+ struct {
+ const struct tevent_threadpool_job_description *desc;
+ void *arg;
+ } state;
};
struct tevent_threadpool {
struct tevent_context *ev;
- struct threadpool *threadpool;
- int next_job_id;
- int sig_fd;
- struct tevent_req **pending;
- struct tevent_threadpool_state **orphaned;
- int num_orphaned;
+ size_t max_jobs;
+ struct tevent_threadpool_job *jobs;
+ struct tevent_queue *job_queue;
- struct tevent_fd *fde;
};
static void tevent_threadpool_handler(struct tevent_context *ev,
{
struct tevent_threadpool_state *state = tevent_req_data(
req, struct tevent_threadpool_state);
- struct tevent_threadpool *pool = state->pool;
+ //struct tevent_threadpool *pool = state->pool;
printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
}
printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
-
+#if 0
threadpool_job_cancel(state->job);
/*
tevent_threadpool_req_unset_pending(req);
pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
&req->data);
+#endif
}
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- tevent_threadpool_fn_t fn,
- void *pdata,
- size_t data_size,
- const char *type,
- const char *location)
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
+{
+ if (state->pool == NULL) {
+ return 0;
+ }
+
+ DLIST_REMOVE(job->pool, job);
+ job->pool = NULL;
+ return 0;
+}
+
+struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
+ const struct tevent_threadpool_job_description *desc,
+ void *pstate,
+ const char *location)
{
struct tevent_threadpool_job *job;
- void **ppdata = (void **)pdata;
- void *data;
+ void **ppstate = (void **)pstate;
+ void *state;
+ size_t payload;
- job = talloc_pooled_object(mem_ctx, tevent_threadpool_job, 1,
- data_size);
- if (job == NULL) {
+ payload = sizeof(struct tevent_immediate) + desc->type_size;
+ if (payload < sizeof(struct tevent_immediate)) {
+ /* overflow */
return NULL;
}
- ZERO_STRUCTP(job);
- job->fn = fn;
-
- data = talloc_zero_size(job, data_size);
- if (data == NULL) {
- talloc_free(job);
+ job = talloc_pooled_object(mem_ctx, struct tevent_threadpool_job,
+ 2, payload);
+ if (job == NULL) {
return NULL;
}
- talloc_set_name_const(job, type);
- job->data = data;
+ *job = (struct tevent_threadpool_job) {
+ .internal = {
+ .im = tevent_create_immediate(job),
+ },
+ .state = {
+ .desc = desc,
+ .arg = talloc_zero_size(job, desc->type_size),
+ },
+ };
+ talloc_set_name_const(job->state.arg, desc->type);
- talloc_set_destructor(job, tevent_threaded_job_destructor);
+ talloc_set_destructor(job, tevent_threadpool_job_destructor);
- *ppdata = data;
+ *ppstate = state;
return job;
}
-struct tevent_req *tevent_threadpool_send(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- struct tevent_threadpool *pool,
- struct tevent_threadpool_job *job)
+struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
+ int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
+ void *pstate,
+ size_t state_size,
+ const char *type,
+ const char *location)
+{
+ return NULL;
+}
+
+struct metze_thread_state {
+ uint8_t tmp;
+};
+
+static int metze_thread_fn(struct metze_thread_state *state)
+{
+ return 0;
+}
+
+TEVENT_THREADPOOL_JOB_DECL(metze_thread_fn, struct metze_thread_state);
+
+struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tevent_threadpool *pool,
+ struct tevent_threadpool_job *job)
{
struct tevent_req *req;
struct tevent_threadpool_state *state;
+#if 0
+struct tevent_threadpool_job *j;
+struct metze_thread_state *jstate;
+
+ j = tevent_threadpool_job_create(mem_ctx, metze_thread_fn,
+ &jstate, struct metze_thread_state);
+ if (j == NULL) {
+ }
+#endif
req = tevent_req_create(mem_ctx, &state, struct tevent_threadpool_state);
if (req == NULL) {
return NULL;
printf("tevent_threadpool_send: sheduling job %d\n", state->job_id);
+
state->threadpool_job = threadpool_add_job(state,
state->pool->threadpool,
state->job_id,
state->job->fn,
- state->job->data);
+ state->job->state);
if (state->threadpool_job == NULL) {
printf("tevent_threadpool_send: job %d state->job == NULL\n", state->job_id);
tevent_req_error(req, errno);
TALLOC_FREE(orphaned_state);
}
-int tevent_threadpool_recv(struct tevent_req *req, int *perrno)
+int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
{
enum tevent_req_state req_state;
uint64_t error;