WIP: add a test for tevent_threadpool_send/recv()
authorRalph Boehme <slow@samba.org>
Wed, 23 Sep 2015 05:41:17 +0000 (07:41 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:49 +0000 (09:51 +0200)
Signed-off-by: Ralph Boehme <slow@samba.org>
lib/tevent/testsuite.c

index 08aa7588acee9977b5b92a6421e3c50570dc11cd..3c7b2bd468f46d8b59631c933a89f459c741dbc0 100644 (file)
@@ -1255,6 +1255,259 @@ static bool test_multi_tevent_threaded_2(struct torture_context *test,
        talloc_free(ev);
        return true;
 }
+
+struct tevent_threaded_test_state {
+       bool ok;
+       int id;
+       char *msg;
+};
+
+static void tevent_threaded_test_do(void *private_data);
+static void tevent_threaded_test_done(struct tevent_req *subreq);
+
+static bool tevent_threaded_test(struct torture_context *test,
+                                const void *test_data)
+{
+       int result;
+       bool ok;
+       struct tevent_context *ev;
+       struct tevent_threadpool *pool;
+       struct tevent_threaded_test_state *state;
+#define NUMREQ 10
+       struct tevent_req *subreq[NUMREQ];
+       int i;
+
+       talloc_disable_null_tracking();
+
+       ev = tevent_context_init(NULL);
+       torture_assert_goto(test, ev != NULL, ok, done,
+                           "tevent_context_init failed\n");
+
+       tevent_set_debug_stderr(ev);
+
+       pool = tevent_threadpool_create(ev, ev, 10);
+       torture_assert_goto(test, pool != NULL, ok, done,
+                           "tevent_threaded_init failed\n");
+
+       /*
+        * Starting at 1 helps debugging as the threadpool uses job
+        * ids starting at 1
+        */
+       for (i = 1; i <= NUMREQ; i++) {
+#undef NUMREQ
+               printf("[mainthread] Sending request %d\n", i);
+
+               state = talloc_zero(NULL, struct tevent_threaded_test_state);
+               torture_assert_goto(test, state != NULL, ok, done,
+                                   "talloc_zero failed\n");
+
+               state->id = i;
+               state->msg = talloc_asprintf(state, "request %d", state->id);
+               torture_assert_goto(test, state->msg != NULL, ok, done,
+                                   "talloc_asprintf failed\n");
+
+               subreq[i-1] = tevent_threadpool_send(pool,
+                                                    tevent_threaded_test_do,
+                                                    state);
+               torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
+                                   "tevent_threadpool_send failed\n");
+               tevent_req_set_callback(subreq[i-1], tevent_threaded_test_done, state);
+       }
+
+       result = tevent_loop_wait(ev);
+       torture_assert_goto(test, result == 0, ok, done,
+                           "tevent_loop_wait failed\n");
+
+       ok = true;
+
+done:
+       talloc_free(ev);
+       return ok;
+}
+
+static void tevent_threaded_test_do(void *private_data)
+{
+       struct tevent_threaded_test_state *state =
+               talloc_get_type_abort(private_data, struct tevent_threaded_test_state);
+
+       printf("[workerthread] job id: %d, msg: %s\n", state->id, state->msg);
+       talloc_free(state->msg);
+
+       state->msg = talloc_asprintf(state, "Thread %d responding", state->id);
+       if (state->msg == NULL) {
+               state->ok = false;
+               return;
+       }
+       state->ok = true;
+}
+
+static void tevent_threaded_test_done(struct tevent_req *subreq)
+{
+       int result, error;
+       struct tevent_threaded_test_state *state =
+               tevent_req_callback_data(subreq, struct tevent_threaded_test_state);
+
+       printf("[mainthread] callback: %d done, %s\n", state->id, state->msg);
+
+       result = tevent_threadpool_recv(subreq, &error);
+       if (result != 0) {
+               printf("[mainthread] recv failed!\n");
+       }
+       if (!state->ok) {
+               printf("[mainthread] failed!\n");
+       }
+
+       TALLOC_FREE(subreq);
+       TALLOC_FREE(state);
+}
+
+struct tevent_threaded_canceltest_state {
+       struct torture_context *tctx;
+       bool *torture_result;
+       bool ok;
+       int id;
+       char *msg;
+};
+
+static void tevent_threaded_canceltest_do(void *private_data);
+static void tevent_threaded_canceltest_done(struct tevent_req *subreq);
+
+static bool tevent_threaded_canceltest(struct torture_context *test,
+                                      const void *test_data)
+{
+       int result;
+       bool ok = true;
+       struct tevent_context *ev;
+       struct tevent_threadpool *pool;
+       struct tevent_threaded_canceltest_state *state;
+       struct tevent_req *subreq[3];
+       int i;
+
+       talloc_disable_null_tracking();
+
+       ev = tevent_context_init(NULL);
+       torture_assert_goto(test, ev != NULL, ok, done,
+                           "tevent_context_init failed\n");
+
+       tevent_set_debug_stderr(ev);
+
+       pool = tevent_threadpool_create(ev, ev, 1);
+       torture_assert_goto(test, pool != NULL, ok, done,
+                           "tevent_threaded_init failed\n");
+
+       for (i = 1; i <= 2; i++) {
+               printf("[mainthread] Sending request %d\n", i);
+
+               state = talloc_zero(pool, struct tevent_threaded_canceltest_state);
+               torture_assert_goto(test, state != NULL, ok, done,
+                                   "talloc_zero failed\n");
+
+               state->id = i;
+               state->tctx = test;
+               state->torture_result = &ok;
+               state->msg = talloc_asprintf(state, "request %d", state->id);
+               torture_assert_goto(test, state->msg != NULL, ok, done,
+                                   "talloc_asprintf failed\n");
+
+               subreq[i-1] = tevent_threadpool_send(pool,
+                                                    tevent_threaded_canceltest_do,
+                                                    state);
+               torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
+                                   "tevent_threadpool_send failed\n");
+               tevent_req_set_callback(subreq[i-1], tevent_threaded_canceltest_done, state);
+       }
+
+       TALLOC_FREE(subreq[1]);
+
+       result = tevent_loop_wait(ev);
+       torture_assert_goto(test, result == 0, ok, done,
+                           "tevent_loop_wait failed\n");
+
+       printf("sleeping...\n");
+       sleep(5);
+
+       /*
+        * Now test reusing state of the request we cancelled via
+        * TALLOC_FREE()
+        */
+       state->id = 3;
+       talloc_free(state->msg);
+       state->msg = talloc_asprintf(state, "request %d", state->id);
+       torture_assert_goto(test, state->msg != NULL, ok, done,
+                           "talloc_asprintf failed\n");
+       subreq[2] = tevent_threadpool_send(pool,
+                                          tevent_threaded_canceltest_do,
+                                          state);
+       torture_assert_goto(test, subreq[2] != NULL, ok, done,
+                           "tevent_threadpool_send failed\n");
+       tevent_req_set_callback(subreq[2], tevent_threaded_canceltest_done, state);
+
+       result = tevent_loop_wait(ev);
+       torture_assert_goto(test, result == 0, ok, done,
+                           "tevent_loop_wait failed\n");
+
+       if (!ok) {
+               torture_fail(test, "some error...\n");
+       }
+
+done:
+       talloc_free(ev);
+       return ok;
+}
+
+static void tevent_threaded_canceltest_do(void *private_data)
+{
+       struct tevent_threaded_canceltest_state *state =
+               talloc_get_type_abort(private_data, struct tevent_threaded_canceltest_state);
+
+       printf("[workerthread] %s\n", state->msg);
+
+       /*
+        * Sleep in the first sheduled job, so the second can be
+        * cancelled before it gets sheduled
+        */
+       sleep(1);
+
+       talloc_free(state->msg);
+       state->msg = talloc_asprintf(state, "job id %d done", state->id);
+       if (state->msg == NULL) {
+               state->ok = false;
+               return;
+       }
+       state->ok = true;
+}
+
+static void tevent_threaded_canceltest_done(struct tevent_req *subreq)
+{
+       int result, error;
+       struct tevent_threaded_canceltest_state *state =
+               tevent_req_callback_data(subreq, struct tevent_threaded_canceltest_state);
+
+       printf("[mainthread] callback job %d\n", state->id);
+
+       result = tevent_threadpool_recv(subreq, &error);
+       if (result != 0) {
+               if (error == EINTR) {
+                       printf("[mainthread] cancelled request\n");
+                       *state->torture_result = false;
+               }
+               printf("[mainthread] recv failed!\n");
+       }
+       if (!state->ok) {
+               printf("[mainthread] computation failed\n");
+               *state->torture_result = false;
+       } else {
+               printf(state->msg);
+       }
+
+       if (state->id == 2) {
+               printf("[mainthread] job 2's callback shouldn't be called!\n");
+               *state->torture_result = false;
+       }
+
+       TALLOC_FREE(subreq);
+       TALLOC_FREE(state);
+}
 #endif
 
 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
@@ -1301,6 +1554,14 @@ struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
                                             test_multi_tevent_threaded_2,
                                             NULL);
 
+       torture_suite_add_simple_tcase_const(suite, "tevent_threaded_test",
+                                            tevent_threaded_test,
+                                            NULL);
+       }
+
+       torture_suite_add_simple_tcase_const(suite, "tevent_threaded_canceltest",
+                                            tevent_threaded_canceltest,
+                                            NULL);
 #endif
 
        return suite;