/*
tevent event library.
- Copyright (C) Jeremy Allison 2015
+ Copyright (C) Stefan Metzmacher 2016
** NOTE! The following LGPL license applies to the tevent
** library. This does NOT imply that all of Samba is released
struct tevent_threadpool_job {
struct tevent_threadpool_job *prev, *next;
- struct tevent_threadpool *pool;
+
+ struct {
+ struct tevent_threadpool *pool;
+ struct tevent_req *req;
+ } busy;
struct {
struct tevent_immediate *im;
- bool busy;
- bool done;
int ret;
//bool cancel;
} internal;
struct {
const struct tevent_threadpool_job_description *desc;
- void *arg;
+ void *args;
} state;
};
};
-static void tevent_threadpool_handler(struct tevent_context *ev,
- struct tevent_fd *fde,
- uint16_t flags, void *private_data);
-
static int tevent_threadpool_destructor(struct tevent_threadpool *pool)
{
tevent_queue_stop(pool->job_queue);
//TODO DLIST_REMOVE all pool->jobs
-#if 0
- printf("tevent_threadpool_destructor\n");
- while (talloc_array_length(pool->pending) != 0) {
- /* No TALLOC_FREE here */
- talloc_free(pool->pending[0]);
- }
-
- while (pool->num_orphaned != 0) {
- /*
- * We've got jobs in the queue for which the tevent_req has
- * been finished already. Wait for all of them to finish.
- */
- tevent_threadpool_handler(NULL, NULL, TEVENT_FD_READ, pool);
- }
-
- threadpool_destroy(pool->threadpool);
- pool->threadpool = NULL;
-#endif
return 0;
}
DLIST_ADD_END(ev->threads.pools, pool);
+ talloc_set_destructor(pool, tevent_threadpool_destructor);
return pool;
}
-#if 0
-static int tevent_threadpool_next_job_id(struct tevent_threadpool *pool)
-{
- int num_pending = talloc_array_length(pool->pending);
- int result;
-
- while (true) {
- int i;
-
- result = pool->next_job_id++;
- if (result == 0) {
- continue;
- }
-
- for (i = 0; i < num_pending; i++) {
- struct tevent_threadpool_state *state = tevent_req_data(
- pool->pending[i], struct tevent_threadpool_state);
-
- if (result == state->job_id) {
- break;
- }
- }
- if (i == num_pending) {
- return result;
- }
- }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req);
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state);
-static bool tevent_threadpool_req_set_pending(struct tevent_req *req,
- struct tevent_threadpool *pool,
- struct tevent_context *ev)
-{
- struct tevent_req **pending;
- int num_pending, orphaned_array_length;
-
- num_pending = talloc_array_length(pool->pending);
- printf("tevent_threadpool_req_set_pending: num_pending: %d\n",
- num_pending);
- pending = talloc_realloc(pool, pool->pending, struct tevent_req *,
- num_pending + 1);
- if (pending == NULL) {
- return false;
- }
- pending[num_pending] = req;
- num_pending++;
- pool->pending = pending;
- tevent_req_set_cleanup_fn(req, tevent_threadpool_req_cleanup);
-
- /*
- * Make sure that the orphaned array of
- * tevent_threadpool_state structs has enough space. A job can
- * change from pending to orphaned in
- * tevent_threadpool_cleanup, and to fail in a talloc
- * destructor should be avoided if possible.
- */
-
- orphaned_array_length = talloc_array_length(pool->orphaned);
- if (num_pending > orphaned_array_length) {
- struct tevent_threadpool_state **orphaned;
-
- orphaned = talloc_realloc(pool, pool->orphaned,
- struct tevent_threadpool_state *,
- orphaned_array_length + 1);
- if (orphaned == NULL) {
- tevent_threadpool_req_unset_pending(req);
- return false;
- }
- pool->orphaned = orphaned;
- }
-
- if (pool->fde != NULL) {
- return true;
- }
-
- pool->fde = tevent_add_fd(pool->ev,
- pool,
- pool->sig_fd, TEVENT_FD_READ,
- tevent_threadpool_handler, pool);
- if (pool->fde == NULL) {
- tevent_threadpool_req_unset_pending(req);
- return false;
- }
- return true;
-}
-
-static void tevent_threadpool_state_unset_orphaned(struct tevent_threadpool_state *state)
-{
- struct tevent_threadpool *pool = state->pool;
- int num_pending = talloc_array_length(pool->pending);
-
- if (num_pending == 0 && pool->num_orphaned == 0) {
- printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
- TALLOC_FREE(pool->fde);
- }
-}
-
-static void tevent_threadpool_req_unset_pending(struct tevent_req *req)
-{
- struct tevent_threadpool_state *state = tevent_req_data(
- req, struct tevent_threadpool_state);
- struct tevent_threadpool *pool = state->pool;
- int num_pending = talloc_array_length(pool->pending);
- int i;
-
- tevent_req_set_cleanup_fn(req, NULL);
-
- if (num_pending == 1) {
- if (pool->num_orphaned == 0) {
- printf("tevent_threadpool_req_unset_pending: disabling IPC fd\n");
- TALLOC_FREE(pool->fde);
- }
- TALLOC_FREE(pool->pending);
- return;
- }
-
- for (i = 0; i < num_pending; i++) {
- if (req == pool->pending[i]) {
- break;
- }
- }
- if (i == num_pending) {
- return;
- }
- if (num_pending > 1) {
- pool->pending[i] = pool->pending[num_pending - 1];
- }
- pool->pending = talloc_realloc(NULL, pool->pending, struct tevent_req *,
- num_pending - 1);
-}
-#endif
-static void tevent_threadpool_req_cleanup(struct tevent_req *req,
- enum tevent_req_state req_state)
+static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
{
-#if 0
- struct tevent_threadpool_state *state = tevent_req_data(
- req, struct tevent_threadpool_state);
- //struct tevent_threadpool *pool = state->pool;
-
- //printf("tevent_threadpool_req_cleanup: %d\n", state->job_id);
-
- switch (req_state) {
- case TEVENT_REQ_RECEIVED:
- break;
- default:
- // printf("tevent_threadpool_req_cleanup: %d, return\n", state->job_id);
- return;
+ if (job->busy.req != NULL) {
+ tevent_req_received(job->busy.req);
+ job->busy.req = NULL;
}
- if (state->done) {
- // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
- tevent_threadpool_req_unset_pending(req);
- return;
+ if (job->busy.pool != NULL) {
+ DLIST_REMOVE(job->busy.pool->jobs, job);
+ job->busy.pool = NULL;
}
- //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
- threadpool_job_cancel(state->job);
-
- /*
- * Keep around the state of the deleted request until the
- * request has finished in the helper
- * thread. tevent_threadpool_handler will destroy it.
- */
- pool->num_orphaned++;
- tevent_threadpool_req_unset_pending(req);
- pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
- &req->data);
-#endif
-}
-
-static int tevent_threadpool_job_destructor(struct tevent_threadpool_job *job)
-{
- if (job->pool == NULL) {
- return 0;
- }
-
- //DLIST_REMOVE(job->pool, job);
- job->pool = NULL;
return 0;
}
struct tevent_threadpool_job *_tevent_threadpool_job_create(TALLOC_CTX *mem_ctx,
const struct tevent_threadpool_job_description *desc,
- void *pstate,
+ void *pargs,
const char *location)
{
struct tevent_threadpool_job *job;
- void **ppstate = (void **)pstate;
- void *state;
+ void **ppargs = (void **)pargs;
size_t payload;
- payload = sizeof(struct tevent_immediate) + desc->type_size;
+ payload = sizeof(struct tevent_immediate) + desc->args_size;
if (payload < sizeof(struct tevent_immediate)) {
/* overflow */
return NULL;
},
.state = {
.desc = desc,
- .arg = talloc_zero_size(job, desc->type_size),
+ .args = talloc_zero_size(job, desc->args_size),
},
};
- talloc_set_name_const(job->state.arg, desc->type);
+ talloc_set_name_const(job->state.args, desc->args_type);
talloc_set_destructor(job, tevent_threadpool_job_destructor);
- *ppstate = state;
+ *ppargs = job->state.args;
return job;
}
-struct metze_thread_args {
+
+struct metze_thread_fn_args {
uint8_t tmp;
};
-static int metze_thread_fn(struct metze_thread_args *args)
+static int metze_thread_fn(struct metze_thread_fn_args *args)
{
return 0;
}
-TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread)
+TEVENT_THREADPOOL_JOB_DESCRIPTION_DECL(metze_thread_fn)
struct tevent_threadpool_job_state {
struct tevent_threadpool_job *job;
struct tevent_threadpool_job_state *state =
tevent_req_data(req,
struct tevent_threadpool_job_state);
+ struct tevent_threadpool_job *job = state->job;
+
+ if (job == NULL) {
+ return;
+ }
switch (req_state) {
case TEVENT_REQ_RECEIVED:
return;
}
+ state->job = NULL;
- if (state->done) {
- // printf("tevent_threadpool_req_cleanup: %d, done\n", state->job_id);
- tevent_threadpool_req_unset_pending(req);
- return;
- }
-
- //printf("tevent_threadpool_req_cleanup: %d, cleanup\n", state->job_id);
- threadpool_job_cancel(state->job);
-
- /*
- * Keep around the state of the deleted request until the
- * request has finished in the helper
- * thread. tevent_threadpool_handler will destroy it.
- */
- pool->num_orphaned++;
- tevent_threadpool_req_unset_pending(req);
- pool->orphaned[pool->num_orphaned - 1] = talloc_move(pool->orphaned,
- &req->data);
-#endif
+ job->busy.req = NULL;
}
+
struct tevent_req *tevent_threadpool_job_send(TALLOC_CTX *mem_ctx,
struct tevent_context *ev,
struct tevent_threadpool *pool,
{
struct tevent_req *req;
struct tevent_threadpool_job_state *state;
+ struct tevent_threadpool_job *j;
+ struct metze_thread_fn_args *args;
req = tevent_req_create(mem_ctx, &state,
struct tevent_threadpool_job_state);
}
state->job = job;
+ tevent_req_set_cleanup_fn(req, tevent_threadpool_job_cleanup);
+
+ j = tevent_threadpool_job_create(state, metze_thread_fn, &args);
+ if (tevent_req_nomem(j, req)) {
+ return tevent_req_post(req, ev);
+ }
+
return req;
}
int tevent_threadpool_job_recv(struct tevent_req *req, int *perrno)
{
- struct tevent_threadpool_job_state *state =
- tevent_req_data(req,
- struct tevent_threadpool_job_state);
enum tevent_req_state req_state;
uint64_t error;