talloc_free(ev);
return true;
}
+
+struct tevent_threaded_test_state {
+ bool ok;
+ int id;
+ char *msg;
+};
+
+static void tevent_threaded_test_do(void *private_data);
+static void tevent_threaded_test_done(struct tevent_req *subreq);
+
+static bool tevent_threaded_test(struct torture_context *test,
+ const void *test_data)
+{
+ int result;
+ bool ok;
+ struct tevent_context *ev;
+ struct tevent_threadpool *pool;
+ struct tevent_threaded_test_state *state;
+#define NUMREQ 10
+ struct tevent_req *subreq[NUMREQ];
+ int i;
+
+ talloc_disable_null_tracking();
+
+ ev = tevent_context_init(NULL);
+ torture_assert_goto(test, ev != NULL, ok, done,
+ "tevent_context_init failed\n");
+
+ tevent_set_debug_stderr(ev);
+
+ pool = tevent_threadpool_create(ev, ev, 10);
+ torture_assert_goto(test, pool != NULL, ok, done,
+ "tevent_threaded_init failed\n");
+
+ /*
+ * Starting at 1 helps debugging as the threadpool uses job
+ * ids starting at 1
+ */
+ for (i = 1; i <= NUMREQ; i++) {
+#undef NUMREQ
+ printf("[mainthread] Sending request %d\n", i);
+
+ state = talloc_zero(NULL, struct tevent_threaded_test_state);
+ torture_assert_goto(test, state != NULL, ok, done,
+ "talloc_zero failed\n");
+
+ state->id = i;
+ state->msg = talloc_asprintf(state, "request %d", state->id);
+ torture_assert_goto(test, state->msg != NULL, ok, done,
+ "talloc_asprintf failed\n");
+
+ subreq[i-1] = tevent_threadpool_send(pool,
+ tevent_threaded_test_do,
+ state);
+ torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
+ "tevent_threadpool_send failed\n");
+ tevent_req_set_callback(subreq[i-1], tevent_threaded_test_done, state);
+ }
+
+ result = tevent_loop_wait(ev);
+ torture_assert_goto(test, result == 0, ok, done,
+ "tevent_loop_wait failed\n");
+
+ ok = true;
+
+done:
+ talloc_free(ev);
+ return ok;
+}
+
+static void tevent_threaded_test_do(void *private_data)
+{
+ struct tevent_threaded_test_state *state =
+ talloc_get_type_abort(private_data, struct tevent_threaded_test_state);
+
+ printf("[workerthread] job id: %d, msg: %s\n", state->id, state->msg);
+ talloc_free(state->msg);
+
+ state->msg = talloc_asprintf(state, "Thread %d responding", state->id);
+ if (state->msg == NULL) {
+ state->ok = false;
+ return;
+ }
+ state->ok = true;
+}
+
+static void tevent_threaded_test_done(struct tevent_req *subreq)
+{
+ int result, error;
+ struct tevent_threaded_test_state *state =
+ tevent_req_callback_data(subreq, struct tevent_threaded_test_state);
+
+ printf("[mainthread] callback: %d done, %s\n", state->id, state->msg);
+
+ result = tevent_threadpool_recv(subreq, &error);
+ if (result != 0) {
+ printf("[mainthread] recv failed!\n");
+ }
+ if (!state->ok) {
+ printf("[mainthread] failed!\n");
+ }
+
+ TALLOC_FREE(subreq);
+ TALLOC_FREE(state);
+}
+
+struct tevent_threaded_canceltest_state {
+ struct torture_context *tctx;
+ bool *torture_result;
+ bool ok;
+ int id;
+ char *msg;
+};
+
+static void tevent_threaded_canceltest_do(void *private_data);
+static void tevent_threaded_canceltest_done(struct tevent_req *subreq);
+
+static bool tevent_threaded_canceltest(struct torture_context *test,
+ const void *test_data)
+{
+ int result;
+ bool ok = true;
+ struct tevent_context *ev;
+ struct tevent_threadpool *pool;
+ struct tevent_threaded_canceltest_state *state;
+ struct tevent_req *subreq[3];
+ int i;
+
+ talloc_disable_null_tracking();
+
+ ev = tevent_context_init(NULL);
+ torture_assert_goto(test, ev != NULL, ok, done,
+ "tevent_context_init failed\n");
+
+ tevent_set_debug_stderr(ev);
+
+ pool = tevent_threadpool_create(ev, ev, 1);
+ torture_assert_goto(test, pool != NULL, ok, done,
+ "tevent_threaded_init failed\n");
+
+ for (i = 1; i <= 2; i++) {
+ printf("[mainthread] Sending request %d\n", i);
+
+ state = talloc_zero(pool, struct tevent_threaded_canceltest_state);
+ torture_assert_goto(test, state != NULL, ok, done,
+ "talloc_zero failed\n");
+
+ state->id = i;
+ state->tctx = test;
+ state->torture_result = &ok;
+ state->msg = talloc_asprintf(state, "request %d", state->id);
+ torture_assert_goto(test, state->msg != NULL, ok, done,
+ "talloc_asprintf failed\n");
+
+ subreq[i-1] = tevent_threadpool_send(pool,
+ tevent_threaded_canceltest_do,
+ state);
+ torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
+ "tevent_threadpool_send failed\n");
+ tevent_req_set_callback(subreq[i-1], tevent_threaded_canceltest_done, state);
+ }
+
+ TALLOC_FREE(subreq[1]);
+
+ result = tevent_loop_wait(ev);
+ torture_assert_goto(test, result == 0, ok, done,
+ "tevent_loop_wait failed\n");
+
+ printf("sleeping...\n");
+ sleep(5);
+
+ /*
+ * Now test reusing state of the request we cancelled via
+ * TALLOC_FREE()
+ */
+ state->id = 3;
+ talloc_free(state->msg);
+ state->msg = talloc_asprintf(state, "request %d", state->id);
+ torture_assert_goto(test, state->msg != NULL, ok, done,
+ "talloc_asprintf failed\n");
+ subreq[2] = tevent_threadpool_send(pool,
+ tevent_threaded_canceltest_do,
+ state);
+ torture_assert_goto(test, subreq[2] != NULL, ok, done,
+ "tevent_threadpool_send failed\n");
+ tevent_req_set_callback(subreq[2], tevent_threaded_canceltest_done, state);
+
+ result = tevent_loop_wait(ev);
+ torture_assert_goto(test, result == 0, ok, done,
+ "tevent_loop_wait failed\n");
+
+ if (!ok) {
+ torture_fail(test, "some error...\n");
+ }
+
+done:
+ talloc_free(ev);
+ return ok;
+}
+
+static void tevent_threaded_canceltest_do(void *private_data)
+{
+ struct tevent_threaded_canceltest_state *state =
+ talloc_get_type_abort(private_data, struct tevent_threaded_canceltest_state);
+
+ printf("[workerthread] %s\n", state->msg);
+
+ /*
+ * Sleep in the first sheduled job, so the second can be
+ * cancelled before it gets sheduled
+ */
+ sleep(1);
+
+ talloc_free(state->msg);
+ state->msg = talloc_asprintf(state, "job id %d done", state->id);
+ if (state->msg == NULL) {
+ state->ok = false;
+ return;
+ }
+ state->ok = true;
+}
+
+static void tevent_threaded_canceltest_done(struct tevent_req *subreq)
+{
+ int result, error;
+ struct tevent_threaded_canceltest_state *state =
+ tevent_req_callback_data(subreq, struct tevent_threaded_canceltest_state);
+
+ printf("[mainthread] callback job %d\n", state->id);
+
+ result = tevent_threadpool_recv(subreq, &error);
+ if (result != 0) {
+ if (error == EINTR) {
+ printf("[mainthread] cancelled request\n");
+ *state->torture_result = false;
+ }
+ printf("[mainthread] recv failed!\n");
+ }
+ if (!state->ok) {
+ printf("[mainthread] computation failed\n");
+ *state->torture_result = false;
+ } else {
+ printf(state->msg);
+ }
+
+ if (state->id == 2) {
+ printf("[mainthread] job 2's callback shouldn't be called!\n");
+ *state->torture_result = false;
+ }
+
+ TALLOC_FREE(subreq);
+ TALLOC_FREE(state);
+}
#endif
struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
test_multi_tevent_threaded_2,
NULL);
+ torture_suite_add_simple_tcase_const(suite, "tevent_threaded_test",
+ tevent_threaded_test,
+ NULL);
+ }
+
+ torture_suite_add_simple_tcase_const(suite, "tevent_threaded_canceltest",
+ tevent_threaded_canceltest,
+ NULL);
#endif
return suite;