* requests in a helper threadpool.
*/
+#ifdef DOXYGEN
/**
* @brief Create a tevent_threadpool
*
struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
int max_threads);
+#else
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads,
+ const char *location);
+#define tevent_threadpool_create(mem_ctx, ev, max_threads) \
+ _tevent_threadpool_job_create((mem_ctx), (ev), (max_threads), \
+ __location__)
+
+#endif
/**
* @brief Destroy a tevent_threadpool
* @param[in] type The name of the job.
*/
struct tevent_threadpool_job *tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- #type,
- int (*fn)(type *state),
- struct tevent_threadpool_job **pjob,
+ #name,
void **pstate);
#else
struct tevent_threadpool_job_description {
};
#define TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(__name) \
-static const union { \
- struct tevent_threadpool_job_description generic; \
- struct { \
- const char *type; \
- size_t type_size; \
- int (*fn)(struct __name##_arg *arg); \
- } specific; \
-} __name##_description = { \
- .specific = { \
- .type = "struct " #__name "_arg", \
- .type_size = sizeof(struct __name##_arg), \
- .fn = __name##_fn, \
- }, \
-};
-#define TEVENT_THREADPOOL_JOB_PTR(fn) \
- (&tevent_threadpool_job##fn.generic)
+static const struct tevent_threadpool_job_description *__name##_description(void) \
+{ \
+ static const union { \
+ struct tevent_threadpool_job_description generic; \
+ struct { \
+ const char *type; \
+ size_t type_size; \
+ int (*fn)(struct __name##_args *args); \
+ } specific; \
+ } description = { \
+ .specific = { \
+ .type = "struct " #__name "_args", \
+ .type_size = sizeof(struct __name##_args), \
+ .fn = __name##_fn, \
+ }, \
+ }; \
+ \
+ return &description.generic; \
+}
struct tevent_threadpool_job *__tevent_threadpool_job_create(TALLOC_CTX *mem_ct,
const struct tevent_threadpool_job_description *desc,
void *pstate,
const char *location);
-struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
- int (*fn)(void *), //union tevent_threadpool_job_fn_t fn,
- void *pstate,
- size_t state_size,
- const char *type,
- const char *location);
-
-#if (__GNUC__ >= 3)
-#if 0
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) ( \
- int (*__function_ptr)(type *state) = (fn), \
- _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
- sizeof(type), #type, __location))
-#define __tevent_threadpool_job_create(mem_ctx, type, fn, pjob, pstate) do { \
- int (*__function_ptr)(type *) = (fn); \
- *pjob = _tevent_threadpool_job_create((mem_ctx), (int (*)(void *))(__function_ptr), (pstate), \
- sizeof(type), #type, __location__); \
-#endif
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
- _tevent_threadpool_job_create((mem_ctx), \
- ( (\
- ((union { \
- int (*generic_fn)(void *state); \
- int (*specific_fn)(type *state); \
- }) { .specific_fn = fn, })),\
- (pstate), sizeof(type), #type, __location__)
-#else
-#define tevent_threadpool_job_create(mem_ctx, fn, pstate, type) \
- _tevent_threadpool_job_create((mem_ctx), (int *)(void *)(fn), (pstate), \
- sizeof(type), #type, __location__)
-#endif
+#define tevent_threadpool_job_create(mem_ctx, name, pstate) \
+ _tevent_threadpool_job_create((mem_ctx), __name##_description(), \
+ (pstate), __location__)
#endif
};
struct tevent_threadpool {
+ struct tevent_threadpool *prev, *next;
struct tevent_context *ev;
- size_t max_jobs;
+ size_t max_threads;
struct tevent_threadpool_job *jobs;
struct tevent_queue *job_queue;
static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
{
+ tevent_queue_stop(pool->job_queue);
+
+ //TODO DLIST_REMOVE all pool->jobs
+
+#if 0
printf("tevent_threadpool_destructor\n");
while (talloc_array_length(pool->pending) != 0) {
/* No TALLOC_FREE here */
threadpool_destroy(pool->threadpool);
pool->threadpool = NULL;
-
+#endif
return 0;
}
-struct tevent_threadpool *tevent_threadpool_create(TALLOC_CTX *mem_ctx,
- struct tevent_context *ev,
- int max_threads)
+struct tevent_threadpool *_tevent_threadpool_create(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ int max_threads,
+ const char *location)
{
struct tevent_threadpool *pool;
- pool = talloc_zero(mem_ctx, struct tevent_threadpool);
- if (pool == NULL) {
+ if (ev->wrapper.glue != NULL) {
+ /*
+ * stacking of wrappers is not supported
+ */
+ tevent_debug(ev->wrapper.glue->main_ev, TEVENT_DEBUG_FATAL,
+ "%s: %s() stacking not allowed",
+ __func__, location);
+ errno = ENOSYS;
return NULL;
}
- pool->ev = ev;
- pool->threadpool = threadpool_init(pool, max_threads);
- if (pool->threadpool == NULL) {
- TALLOC_FREE(pool);
+ pool = talloc_zero(mem_ctx, struct tevent_threadpool);
+ if (pool == NULL) {
return NULL;
}
- talloc_set_destructor(pool, tevent_threadpool_destructor);
+ *pool = (struct tevent_threadpool) {
+ .ev = ev,
+ .max_threads = max_threads,
+ };
- pool->sig_fd = threadpool_signal_fd(pool->threadpool);
- if (pool->sig_fd == -1) {
+ pool->job_queue = tevent_queue_create(pool, "job_queue");
+ if (pool->job_queue == NULL) {
TALLOC_FREE(pool);
return NULL;
}
+ DLIST_ADD_END(ev->threads.pools, pool);
+
return pool;
}
-
+#if 0
static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
{
int num_pending = talloc_array_length(pool->pending);
pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
num_pending - 1);
}
-
+#endif
static void tevent_threadpool_req_cleanup(struct tevent_req *req,
enum tevent_req_state req_state)
{
req, struct tevent_threadpool_state);
//struct tevent_threadpool *pool = state->pool;
- printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
+ //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
switch (req_state) {
case TEVENT_REQ_RECEIVED:
break;
default:
- printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
+ // printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
return;
}
if (state->done) {
- printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
+ // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
tevent_threadpool_req_unset_pending(req);
return;
}
- printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
+ //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
#if 0
threadpool_job_cancel(state->job);
return NULL;
}
-struct metze_thread_state {
+struct metze_thread_args {
uint8_t tmp;
};
return 0;
}
-TEVENT_THREADPOOL_JOB_DECL(metze_thread_fn, struct metze_thread_state);
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,