2 Unix SMB/CIFS implementation.
4 testing of the events subsystem
6 Copyright (C) Stefan Metzmacher 2006-2009
7 Copyright (C) Jeremy Allison 2013
9 ** NOTE! The following LGPL license applies to the tevent
10 ** library. This does NOT imply that all of Samba is released
13 This library is free software; you can redistribute it and/or
14 modify it under the terms of the GNU Lesser General Public
15 License as published by the Free Software Foundation; either
16 version 3 of the License, or (at your option) any later version.
18 This library is distributed in the hope that it will be useful,
19 but WITHOUT ANY WARRANTY; without even the implied warranty of
20 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
21 Lesser General Public License for more details.
23 You should have received a copy of the GNU Lesser General Public
24 License along with this library; if not, see <http://www.gnu.org/licenses/>.
29 #include "system/filesys.h"
30 #include "system/select.h"
31 #include "system/network.h"
32 #include "torture/torture.h"
33 #include "torture/local/proto.h"
44 static void do_read(int fd, void *buf, size_t count)
49 ret = read(fd, buf, count);
50 } while (ret == -1 && errno == EINTR);
53 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
54 uint16_t flags, void *private_data)
56 int *fd = (int *)private_data;
59 kill(getpid(), SIGUSR1);
61 kill(getpid(), SIGALRM);
63 do_read(fd[0], &c, 1);
67 static void do_write(int fd, void *buf, size_t count)
72 ret = write(fd, buf, count);
73 } while (ret == -1 && errno == EINTR);
76 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
77 uint16_t flags, void *private_data)
79 int *fd = (int *)private_data;
82 do_write(fd[1], &c, 1);
86 /* This will only fire if the fd's returned from pipe() are bi-directional. */
87 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
88 uint16_t flags, void *private_data)
90 int *fd = (int *)private_data;
93 kill(getpid(), SIGUSR1);
95 kill(getpid(), SIGALRM);
97 do_read(fd[1], &c, 1);
101 /* This will only fire if the fd's returned from pipe() are bi-directional. */
102 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
103 uint16_t flags, void *private_data)
105 int *fd = (int *)private_data;
107 do_write(fd[0], &c, 1);
110 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
111 struct timeval tval, void *private_data)
113 int *finished = (int *)private_data;
117 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
118 int signum, int count, void *info, void *private_data)
120 int *countp = (int *)private_data;
124 static bool test_event_context(struct torture_context *test,
125 const void *test_data)
127 struct tevent_context *ev_ctx;
128 int fd[2] = { -1, -1 };
129 const char *backend = (const char *)test_data;
130 int alarm_count=0, info_count=0;
131 struct tevent_fd *fde_read;
132 struct tevent_fd *fde_read_1;
133 struct tevent_fd *fde_write;
134 struct tevent_fd *fde_write_1;
136 struct tevent_signal *se1 = NULL;
139 struct tevent_signal *se2 = NULL;
142 struct tevent_signal *se3 = NULL;
148 ev_ctx = tevent_context_init_byname(test, backend);
149 if (ev_ctx == NULL) {
150 torture_comment(test, "event backend '%s' not supported\n", backend);
154 torture_comment(test, "backend '%s' - %s\n",
155 backend, __FUNCTION__);
162 torture_assert_int_equal(test, ret, 0, "pipe failed");
164 fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
165 fde_handler_read, fd);
166 fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
167 fde_handler_write_1, fd);
169 fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
170 fde_handler_write, fd);
171 fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
172 fde_handler_read_1, fd);
174 tevent_fd_set_auto_close(fde_read);
175 tevent_fd_set_auto_close(fde_write);
177 tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
178 finished_handler, &finished);
181 se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
182 torture_assert(test, se1 != NULL, "failed to setup se1");
185 se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
186 torture_assert(test, se2 != NULL, "failed to setup se2");
189 se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
190 torture_assert(test, se3 != NULL, "failed to setup se3");
193 t = timeval_current();
196 if (tevent_loop_once(ev_ctx) == -1) {
198 torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
203 talloc_free(fde_read_1);
204 talloc_free(fde_write_1);
205 talloc_free(fde_read);
206 talloc_free(fde_write);
208 while (alarm_count < fde_count+1) {
209 if (tevent_loop_once(ev_ctx) == -1) {
214 torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
220 torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
224 * we do not call talloc_free(se2)
225 * because it is already gone,
226 * after triggering the event handler.
232 torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
240 struct test_event_fd1_state {
241 struct torture_context *tctx;
243 struct tevent_context *ev;
245 struct tevent_timer *te;
246 struct tevent_fd *fde0;
247 struct tevent_fd *fde1;
257 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
258 struct tevent_fd *fde,
262 struct test_event_fd1_state *state =
263 (struct test_event_fd1_state *)private_data;
265 if (state->drain_done) {
266 state->finished = true;
267 state->error = __location__;
275 if (!(flags & TEVENT_FD_READ)) {
276 state->finished = true;
277 state->error = __location__;
281 ret = read(state->sock[0], &c, 1);
289 tevent_fd_set_flags(fde, 0);
290 state->drain_done = true;
294 if (!state->got_write) {
297 if (flags != TEVENT_FD_WRITE) {
298 state->finished = true;
299 state->error = __location__;
302 state->got_write = true;
305 * we write to the other socket...
307 do_write(state->sock[1], &c, 1);
308 TEVENT_FD_NOT_WRITEABLE(fde);
309 TEVENT_FD_READABLE(fde);
313 if (!state->got_read) {
314 if (flags != TEVENT_FD_READ) {
315 state->finished = true;
316 state->error = __location__;
319 state->got_read = true;
321 TEVENT_FD_NOT_READABLE(fde);
325 state->finished = true;
326 state->error = __location__;
330 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
331 struct tevent_timer *te,
335 struct test_event_fd1_state *state =
336 (struct test_event_fd1_state *)private_data;
338 if (state->drain_done) {
339 state->finished = true;
343 if (!state->got_write) {
344 state->finished = true;
345 state->error = __location__;
349 if (!state->got_read) {
350 state->finished = true;
351 state->error = __location__;
356 if (state->loop_count > 3) {
357 state->finished = true;
358 state->error = __location__;
362 state->got_write = false;
363 state->got_read = false;
365 tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
367 if (state->loop_count > 2) {
369 TALLOC_FREE(state->fde1);
370 TEVENT_FD_READABLE(state->fde0);
373 state->te = tevent_add_timer(state->ev, state->ev,
374 timeval_current_ofs(0,2000),
375 test_event_fd1_finished, state);
378 static bool test_event_fd1(struct torture_context *tctx,
379 const void *test_data)
381 struct test_event_fd1_state state;
386 state.backend = (const char *)test_data;
388 state.ev = tevent_context_init_byname(tctx, state.backend);
389 if (state.ev == NULL) {
390 torture_skip(tctx, talloc_asprintf(tctx,
391 "event backend '%s' not supported\n",
396 tevent_set_debug_stderr(state.ev);
397 torture_comment(tctx, "backend '%s' - %s\n",
398 state.backend, __FUNCTION__);
401 * This tests the following:
403 * It monitors the state of state.sock[0]
404 * with tevent_fd, but we never read/write on state.sock[0]
405 * while state.sock[1] * is only used to write a few bytes.
408 * - we wait only for TEVENT_FD_WRITE on state.sock[0]
409 * - we write 1 byte to state.sock[1]
410 * - we wait only for TEVENT_FD_READ on state.sock[0]
411 * - we disable events on state.sock[0]
412 * - the timer event restarts the loop
413 * Then we close state.sock[1]
415 * - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
416 * - we try to read 1 byte
417 * - if the read gets an error of returns 0
418 * we disable the event handler
419 * - the timer finishes the test
424 ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
425 torture_assert(tctx, ret == 0, "socketpair() failed");
427 state.te = tevent_add_timer(state.ev, state.ev,
428 timeval_current_ofs(0,1000),
429 test_event_fd1_finished, &state);
430 state.fde0 = tevent_add_fd(state.ev, state.ev,
431 state.sock[0], TEVENT_FD_WRITE,
432 test_event_fd1_fde_handler, &state);
433 /* state.fde1 is only used to auto close */
434 state.fde1 = tevent_add_fd(state.ev, state.ev,
436 test_event_fd1_fde_handler, &state);
438 tevent_fd_set_auto_close(state.fde0);
439 tevent_fd_set_auto_close(state.fde1);
441 while (!state.finished) {
443 if (tevent_loop_once(state.ev) == -1) {
444 talloc_free(state.ev);
445 torture_fail(tctx, talloc_asprintf(tctx,
446 "Failed event loop %s\n",
451 talloc_free(state.ev);
453 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
459 struct test_event_fd2_state {
460 struct torture_context *tctx;
462 struct tevent_context *ev;
463 struct tevent_timer *te;
464 struct test_event_fd2_sock {
465 struct test_event_fd2_state *state;
467 struct tevent_fd *fde;
476 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
477 struct tevent_fd *fde,
481 struct test_event_fd2_sock *cur_sock =
482 (struct test_event_fd2_sock *)private_data;
483 struct test_event_fd2_state *state = cur_sock->state;
484 struct test_event_fd2_sock *oth_sock = NULL;
488 if (cur_sock == &state->sock0) {
489 oth_sock = &state->sock1;
491 oth_sock = &state->sock0;
494 if (oth_sock->num_written == 1) {
495 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
496 state->finished = true;
497 state->error = __location__;
502 if (cur_sock->num_read == oth_sock->num_written) {
503 state->finished = true;
504 state->error = __location__;
508 if (!(flags & TEVENT_FD_READ)) {
509 state->finished = true;
510 state->error = __location__;
514 if (oth_sock->num_read >= PIPE_BUF) {
516 * On Linux we become writable once we've read
517 * one byte. On Solaris we only become writable
518 * again once we've read 4096 bytes. PIPE_BUF
519 * is probably a safe bet to test against.
521 * There should be room to write a byte again
523 if (!(flags & TEVENT_FD_WRITE)) {
524 state->finished = true;
525 state->error = __location__;
530 if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
531 v = (uint8_t)cur_sock->num_written;
532 ret = write(cur_sock->fd, &v, 1);
534 state->finished = true;
535 state->error = __location__;
538 cur_sock->num_written++;
539 if (cur_sock->num_written > 0x80000000) {
540 state->finished = true;
541 state->error = __location__;
547 if (!cur_sock->got_full) {
548 cur_sock->got_full = true;
550 if (!oth_sock->got_full) {
553 * lets wait for oth_sock
556 tevent_fd_set_flags(cur_sock->fde, 0);
561 * oth_sock waited for cur_sock,
564 tevent_fd_set_flags(oth_sock->fde,
565 TEVENT_FD_READ|TEVENT_FD_WRITE);
568 ret = read(cur_sock->fd, &v, 1);
570 state->finished = true;
571 state->error = __location__;
574 c = (uint8_t)cur_sock->num_read;
576 state->finished = true;
577 state->error = __location__;
580 cur_sock->num_read++;
582 if (cur_sock->num_read < oth_sock->num_written) {
583 /* there is more to read */
587 * we read everything, we need to remove TEVENT_FD_WRITE
590 TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
592 if (oth_sock->num_read == cur_sock->num_written) {
594 * both directions are finished
596 state->finished = true;
602 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
603 struct tevent_timer *te,
607 struct test_event_fd2_state *state =
608 (struct test_event_fd2_state *)private_data;
611 * this should never be triggered
613 state->finished = true;
614 state->error = __location__;
617 static bool test_event_fd2(struct torture_context *tctx,
618 const void *test_data)
620 struct test_event_fd2_state state;
626 state.backend = (const char *)test_data;
628 state.ev = tevent_context_init_byname(tctx, state.backend);
629 if (state.ev == NULL) {
630 torture_skip(tctx, talloc_asprintf(tctx,
631 "event backend '%s' not supported\n",
636 tevent_set_debug_stderr(state.ev);
637 torture_comment(tctx, "backend '%s' - %s\n",
638 state.backend, __FUNCTION__);
641 * This tests the following
643 * - We write 1 byte to each socket
644 * - We wait for TEVENT_FD_READ/WRITE on both sockets
645 * - When we get TEVENT_FD_WRITE we write 1 byte
646 * until both socket buffers are full, which
647 * means both sockets only get TEVENT_FD_READ.
648 * - Then we read 1 byte until we have consumed
649 * all bytes the other end has written.
653 socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
656 * the timer should never expire
658 state.te = tevent_add_timer(state.ev, state.ev,
659 timeval_current_ofs(600, 0),
660 test_event_fd2_finished, &state);
661 state.sock0.state = &state;
662 state.sock0.fd = sock[0];
663 state.sock0.fde = tevent_add_fd(state.ev, state.ev,
665 TEVENT_FD_READ | TEVENT_FD_WRITE,
666 test_event_fd2_sock_handler,
668 state.sock1.state = &state;
669 state.sock1.fd = sock[1];
670 state.sock1.fde = tevent_add_fd(state.ev, state.ev,
672 TEVENT_FD_READ | TEVENT_FD_WRITE,
673 test_event_fd2_sock_handler,
676 tevent_fd_set_auto_close(state.sock0.fde);
677 tevent_fd_set_auto_close(state.sock1.fde);
679 do_write(state.sock0.fd, &c, 1);
680 state.sock0.num_written++;
681 do_write(state.sock1.fd, &c, 1);
682 state.sock1.num_written++;
684 while (!state.finished) {
686 if (tevent_loop_once(state.ev) == -1) {
687 talloc_free(state.ev);
688 torture_fail(tctx, talloc_asprintf(tctx,
689 "Failed event loop %s\n",
694 talloc_free(state.ev);
696 torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
704 struct immediate_perf_state {
705 pthread_mutex_t mutex;
706 struct timeval start;
711 static void immediate_perf_handler(struct tevent_context *ev,
712 struct tevent_immediate *im,
715 struct immediate_perf_state *state = private_data;
719 ret = pthread_mutex_lock(&state->mutex);
726 ret = pthread_mutex_unlock(&state->mutex);
733 if ((state->count % 1000) == 0) {
734 struct timeval now = tevent_timeval_current();
736 diff = tevent_timeval_until(&state->start, &now);
738 if (diff.tv_sec > 9) {
744 tevent_schedule_immediate(im, ev, immediate_perf_handler, state);
747 static bool test_event_context_immediate_perf(struct torture_context *test,
748 const void *test_data)
750 const char *backend = test_data;
751 struct tevent_context *ev;
752 struct tevent_immediate *im;
753 struct immediate_perf_state state = {
754 .start = tevent_timeval_current()
758 ret = pthread_mutex_init(&state.mutex, NULL);
759 torture_assert(test, ret == 0, "pthread_mutex_init failed");
761 ev = tevent_context_init_byname(test, backend);
763 torture_comment(test, "event backend '%s' not supported\n",
768 torture_comment(test, "backend '%s' - %s\n", backend, __FUNCTION__);
770 im = tevent_create_immediate(ev);
771 torture_assert(test, im != NULL, "tevent_create_immediate failed");
773 tevent_schedule_immediate(im, ev, immediate_perf_handler, &state);
775 while (!state.done) {
776 ret = tevent_loop_once(ev);
777 torture_assert(test, ret == 0, "tevent_loop_once failed");
780 torture_comment(test, "count=%"PRIu64"\n", state.count);
786 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
787 static bool do_shutdown = false;
789 static void test_event_threaded_lock(void)
792 ret = pthread_mutex_lock(&threaded_mutex);
796 static void test_event_threaded_unlock(void)
799 ret = pthread_mutex_unlock(&threaded_mutex);
803 static void test_event_threaded_trace(enum tevent_trace_point point,
807 case TEVENT_TRACE_BEFORE_WAIT:
808 test_event_threaded_unlock();
810 case TEVENT_TRACE_AFTER_WAIT:
811 test_event_threaded_lock();
813 case TEVENT_TRACE_BEFORE_LOOP_ONCE:
814 case TEVENT_TRACE_AFTER_LOOP_ONCE:
819 static void test_event_threaded_timer(struct tevent_context *ev,
820 struct tevent_timer *te,
821 struct timeval current_time,
827 static void *test_event_poll_thread(void *private_data)
829 struct tevent_context *ev = (struct tevent_context *)private_data;
831 test_event_threaded_lock();
835 ret = tevent_loop_once(ev);
838 test_event_threaded_unlock();
845 static void test_event_threaded_read_handler(struct tevent_context *ev,
846 struct tevent_fd *fde,
850 int *pfd = (int *)private_data;
854 if ((flags & TEVENT_FD_READ) == 0) {
859 nread = read(*pfd, &c, 1);
860 } while ((nread == -1) && (errno == EINTR));
865 static bool test_event_context_threaded(struct torture_context *test,
866 const void *test_data)
868 struct tevent_context *ev;
869 struct tevent_timer *te;
870 struct tevent_fd *fde;
871 pthread_t poll_thread;
876 ev = tevent_context_init_byname(test, "poll_mt");
877 torture_assert(test, ev != NULL, "poll_mt not supported");
879 tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
881 te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
882 test_event_threaded_timer, NULL);
883 torture_assert(test, te != NULL, "Could not add timer");
885 ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
886 torture_assert(test, ret == 0, "Could not create poll thread");
889 torture_assert(test, ret == 0, "Could not create pipe");
893 test_event_threaded_lock();
895 fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
896 test_event_threaded_read_handler, &fds[0]);
897 torture_assert(test, fde != NULL, "Could not add fd event");
899 test_event_threaded_unlock();
903 do_write(fds[1], &c, 1);
907 test_event_threaded_lock();
909 test_event_threaded_unlock();
911 do_write(fds[1], &c, 1);
913 ret = pthread_join(poll_thread, NULL);
914 torture_assert(test, ret == 0, "pthread_join failed");
919 #define NUM_TEVENT_THREADS 100
921 /* Ugly, but needed for torture_comment... */
922 static struct torture_context *thread_test_ctx;
923 static pthread_t thread_map[NUM_TEVENT_THREADS];
924 static unsigned thread_counter;
926 /* Called in master thread context */
927 static void callback_nowait(struct tevent_context *ev,
928 struct tevent_immediate *im,
931 pthread_t *thread_id_ptr =
932 talloc_get_type_abort(private_ptr, pthread_t);
935 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
936 if (pthread_equal(*thread_id_ptr,
941 torture_comment(thread_test_ctx,
942 "Callback %u from thread %u\n",
948 /* Blast the master tevent_context with a callback, no waiting. */
949 static void *thread_fn_nowait(void *private_ptr)
951 struct tevent_thread_proxy *master_tp =
952 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
953 struct tevent_immediate *im;
954 pthread_t *thread_id_ptr;
956 im = tevent_create_immediate(NULL);
960 thread_id_ptr = talloc(NULL, pthread_t);
961 if (thread_id_ptr == NULL) {
964 *thread_id_ptr = pthread_self();
966 tevent_thread_proxy_schedule(master_tp,
973 static void timeout_fn(struct tevent_context *ev,
974 struct tevent_timer *te,
975 struct timeval tv, void *p)
977 thread_counter = NUM_TEVENT_THREADS * 10;
980 static bool test_multi_tevent_threaded(struct torture_context *test,
981 const void *test_data)
984 struct tevent_context *master_ev;
985 struct tevent_thread_proxy *tp;
987 talloc_disable_null_tracking();
989 /* Ugly global stuff. */
990 thread_test_ctx = test;
993 master_ev = tevent_context_init(NULL);
994 if (master_ev == NULL) {
997 tevent_set_debug_stderr(master_ev);
999 tp = tevent_thread_proxy_create(master_ev);
1002 talloc_asprintf(test,
1003 "tevent_thread_proxy_create failed\n"));
1004 talloc_free(master_ev);
1008 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1009 int ret = pthread_create(&thread_map[i],
1015 talloc_asprintf(test,
1016 "Failed to create thread %i, %d\n",
1022 /* Ensure we don't wait more than 10 seconds. */
1023 tevent_add_timer(master_ev,
1025 timeval_current_ofs(10,0),
1029 while (thread_counter < NUM_TEVENT_THREADS) {
1030 int ret = tevent_loop_once(master_ev);
1031 torture_assert(test, ret == 0, "tevent_loop_once failed");
1034 torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1035 "thread_counter fail\n");
1037 talloc_free(master_ev);
1041 struct reply_state {
1042 struct tevent_thread_proxy *reply_tp;
1043 pthread_t thread_id;
1047 static void thread_timeout_fn(struct tevent_context *ev,
1048 struct tevent_timer *te,
1049 struct timeval tv, void *p)
1051 int *p_finished = (int *)p;
1056 /* Called in child-thread context */
1057 static void thread_callback(struct tevent_context *ev,
1058 struct tevent_immediate *im,
1061 struct reply_state *rsp =
1062 talloc_get_type_abort(private_ptr, struct reply_state);
1064 talloc_steal(ev, rsp);
1065 *rsp->p_finished = 1;
1068 /* Called in master thread context */
1069 static void master_callback(struct tevent_context *ev,
1070 struct tevent_immediate *im,
1073 struct reply_state *rsp =
1074 talloc_get_type_abort(private_ptr, struct reply_state);
1077 talloc_steal(ev, rsp);
1079 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1080 if (pthread_equal(rsp->thread_id,
1085 torture_comment(thread_test_ctx,
1086 "Callback %u from thread %u\n",
1089 /* Now reply to the thread ! */
1090 tevent_thread_proxy_schedule(rsp->reply_tp,
1098 static void *thread_fn_1(void *private_ptr)
1100 struct tevent_thread_proxy *master_tp =
1101 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1102 struct tevent_thread_proxy *tp;
1103 struct tevent_immediate *im;
1104 struct tevent_context *ev;
1105 struct reply_state *rsp;
1109 ev = tevent_context_init(NULL);
1114 tp = tevent_thread_proxy_create(ev);
1120 im = tevent_create_immediate(ev);
1126 rsp = talloc(ev, struct reply_state);
1132 rsp->thread_id = pthread_self();
1134 rsp->p_finished = &finished;
1136 /* Introduce a little randomness into the mix.. */
1137 usleep(random() % 7000);
1139 tevent_thread_proxy_schedule(master_tp,
1144 /* Ensure we don't wait more than 10 seconds. */
1145 tevent_add_timer(ev,
1147 timeval_current_ofs(10,0),
1151 while (finished == 0) {
1152 ret = tevent_loop_once(ev);
1162 * NB. We should talloc_free(ev) here, but if we do
1163 * we currently get hit by helgrind Fix #323432
1164 * "When calling pthread_cond_destroy or pthread_mutex_destroy
1165 * with initializers as argument Helgrind (incorrectly) reports errors."
1167 * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1168 * with-pthread-mutex-destroy-td47757.html
1170 * Helgrind doesn't understand that the request/reply
1171 * messages provide synchronization between the lock/unlock
1172 * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1173 * when the struct tevent_thread_proxy object is talloc_free'd.
1175 * As a work-around for now return ev for the parent thread to free.
1180 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1181 const void *test_data)
1184 struct tevent_context *master_ev;
1185 struct tevent_thread_proxy *master_tp;
1188 talloc_disable_null_tracking();
1190 /* Ugly global stuff. */
1191 thread_test_ctx = test;
1194 master_ev = tevent_context_init(NULL);
1195 if (master_ev == NULL) {
1198 tevent_set_debug_stderr(master_ev);
1200 master_tp = tevent_thread_proxy_create(master_ev);
1201 if (master_tp == NULL) {
1203 talloc_asprintf(test,
1204 "tevent_thread_proxy_create failed\n"));
1205 talloc_free(master_ev);
1209 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1210 ret = pthread_create(&thread_map[i],
1216 talloc_asprintf(test,
1217 "Failed to create thread %i, %d\n",
1223 while (thread_counter < NUM_TEVENT_THREADS) {
1224 ret = tevent_loop_once(master_ev);
1225 torture_assert(test, ret == 0, "tevent_loop_once failed");
1228 /* Wait for all the threads to finish - join 'em. */
1229 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1231 ret = pthread_join(thread_map[i], &retval);
1232 torture_assert(test, ret == 0, "pthread_join failed");
1233 /* Free the child thread event context. */
1234 talloc_free(retval);
1237 talloc_free(master_ev);
1241 struct threaded_test_2 {
1242 struct tevent_threaded_context *tctx;
1243 struct tevent_immediate *im;
1244 pthread_t thread_id;
1247 static void master_callback_2(struct tevent_context *ev,
1248 struct tevent_immediate *im,
1249 void *private_data);
1251 static void *thread_fn_2(void *private_data)
1253 struct threaded_test_2 *state = private_data;
1255 state->thread_id = pthread_self();
1257 usleep(random() % 7000);
1259 tevent_threaded_schedule_immediate(
1260 state->tctx, state->im, master_callback_2, state);
1265 static void master_callback_2(struct tevent_context *ev,
1266 struct tevent_immediate *im,
1269 struct threaded_test_2 *state = private_data;
1272 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1273 if (pthread_equal(state->thread_id, thread_map[i])) {
1277 torture_comment(thread_test_ctx,
1278 "Callback_2 %u from thread %u\n",
1284 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1285 const void *test_data)
1289 struct tevent_context *ev;
1290 struct tevent_threaded_context *tctx;
1293 thread_test_ctx = test;
1296 ev = tevent_context_init(test);
1297 torture_assert(test, ev != NULL, "tevent_context_init failed");
1300 * tevent_re_initialise used to have a bug where it did not
1301 * re-initialise the thread support after taking it
1302 * down. Excercise that code path.
1304 ret = tevent_re_initialise(ev);
1305 torture_assert(test, ret == 0, "tevent_re_initialise failed");
1307 tctx = tevent_threaded_context_create(ev, ev);
1308 torture_assert(test, tctx != NULL,
1309 "tevent_threaded_context_create failed");
1311 for (i=0; i<NUM_TEVENT_THREADS; i++) {
1312 struct threaded_test_2 *state;
1314 state = talloc(ev, struct threaded_test_2);
1315 torture_assert(test, state != NULL, "talloc failed");
1318 state->im = tevent_create_immediate(state);
1319 torture_assert(test, state->im != NULL,
1320 "tevent_create_immediate failed");
1322 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1323 torture_assert(test, ret == 0, "pthread_create failed");
1326 while (thread_counter < NUM_TEVENT_THREADS) {
1327 ret = tevent_loop_once(ev);
1328 torture_assert(test, ret == 0, "tevent_loop_once failed");
1331 /* Wait for all the threads to finish - join 'em. */
1332 for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1334 ret = pthread_join(thread_map[i], &retval);
1335 torture_assert(test, ret == 0, "pthread_join failed");
1336 /* Free the child thread event context. */
1344 struct tevent_threaded_test_state {
1350 static void tevent_threaded_test_do(void *private_data);
1351 static void tevent_threaded_test_done(struct tevent_req *subreq);
1353 static bool tevent_threaded_test(struct torture_context *test,
1354 const void *test_data)
1358 struct tevent_context *ev;
1359 struct tevent_threadpool *pool;
1360 struct tevent_threaded_test_state *state;
1362 struct tevent_req *subreq[NUMREQ];
1365 talloc_disable_null_tracking();
1367 ev = tevent_context_init(NULL);
1368 torture_assert_goto(test, ev != NULL, ok, done,
1369 "tevent_context_init failed\n");
1371 tevent_set_debug_stderr(ev);
1373 pool = tevent_threadpool_create(ev, ev, 10);
1374 torture_assert_goto(test, pool != NULL, ok, done,
1375 "tevent_threaded_init failed\n");
1378 * Starting at 1 helps debugging as the threadpool uses job
1381 for (i = 1; i <= NUMREQ; i++) {
1383 printf("[mainthread] Sending request %d\n", i);
1385 state = talloc_zero(NULL, struct tevent_threaded_test_state);
1386 torture_assert_goto(test, state != NULL, ok, done,
1387 "talloc_zero failed\n");
1390 state->msg = talloc_asprintf(state, "request %d", state->id);
1391 torture_assert_goto(test, state->msg != NULL, ok, done,
1392 "talloc_asprintf failed\n");
1394 subreq[i-1] = tevent_threadpool_send(pool,
1395 tevent_threaded_test_do,
1397 torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
1398 "tevent_threadpool_send failed\n");
1399 tevent_req_set_callback(subreq[i-1], tevent_threaded_test_done, state);
1402 result = tevent_loop_wait(ev);
1403 torture_assert_goto(test, result == 0, ok, done,
1404 "tevent_loop_wait failed\n");
1413 static void tevent_threaded_test_do(void *private_data)
1415 struct tevent_threaded_test_state *state =
1416 talloc_get_type_abort(private_data, struct tevent_threaded_test_state);
1418 printf("[workerthread] job id: %d, msg: %s\n", state->id, state->msg);
1419 talloc_free(state->msg);
1421 state->msg = talloc_asprintf(state, "Thread %d responding", state->id);
1422 if (state->msg == NULL) {
1429 static void tevent_threaded_test_done(struct tevent_req *subreq)
1432 struct tevent_threaded_test_state *state =
1433 tevent_req_callback_data(subreq, struct tevent_threaded_test_state);
1435 printf("[mainthread] callback: %d done, %s\n", state->id, state->msg);
1437 result = tevent_threadpool_recv(subreq, &error);
1439 printf("[mainthread] recv failed!\n");
1442 printf("[mainthread] failed!\n");
1445 TALLOC_FREE(subreq);
1449 struct tevent_threaded_canceltest_state {
1450 struct torture_context *tctx;
1451 bool *torture_result;
1457 static void tevent_threaded_canceltest_do(void *private_data);
1458 static void tevent_threaded_canceltest_done(struct tevent_req *subreq);
1460 static bool tevent_threaded_canceltest(struct torture_context *test,
1461 const void *test_data)
1465 struct tevent_context *ev;
1466 struct tevent_threadpool *pool;
1467 struct tevent_threaded_canceltest_state *state;
1468 struct tevent_req *subreq[3];
1471 talloc_disable_null_tracking();
1473 ev = tevent_context_init(NULL);
1474 torture_assert_goto(test, ev != NULL, ok, done,
1475 "tevent_context_init failed\n");
1477 tevent_set_debug_stderr(ev);
1479 pool = tevent_threadpool_create(ev, ev, 1);
1480 torture_assert_goto(test, pool != NULL, ok, done,
1481 "tevent_threaded_init failed\n");
1483 for (i = 1; i <= 2; i++) {
1484 printf("[mainthread] Sending request %d\n", i);
1486 state = talloc_zero(pool, struct tevent_threaded_canceltest_state);
1487 torture_assert_goto(test, state != NULL, ok, done,
1488 "talloc_zero failed\n");
1492 state->torture_result = &ok;
1493 state->msg = talloc_asprintf(state, "request %d", state->id);
1494 torture_assert_goto(test, state->msg != NULL, ok, done,
1495 "talloc_asprintf failed\n");
1497 subreq[i-1] = tevent_threadpool_send(pool,
1498 tevent_threaded_canceltest_do,
1500 torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
1501 "tevent_threadpool_send failed\n");
1502 tevent_req_set_callback(subreq[i-1], tevent_threaded_canceltest_done, state);
1505 TALLOC_FREE(subreq[1]);
1507 result = tevent_loop_wait(ev);
1508 torture_assert_goto(test, result == 0, ok, done,
1509 "tevent_loop_wait failed\n");
1511 printf("sleeping...\n");
1515 * Now test reusing state of the request we cancelled via
1519 talloc_free(state->msg);
1520 state->msg = talloc_asprintf(state, "request %d", state->id);
1521 torture_assert_goto(test, state->msg != NULL, ok, done,
1522 "talloc_asprintf failed\n");
1523 subreq[2] = tevent_threadpool_send(pool,
1524 tevent_threaded_canceltest_do,
1526 torture_assert_goto(test, subreq[2] != NULL, ok, done,
1527 "tevent_threadpool_send failed\n");
1528 tevent_req_set_callback(subreq[2], tevent_threaded_canceltest_done, state);
1530 result = tevent_loop_wait(ev);
1531 torture_assert_goto(test, result == 0, ok, done,
1532 "tevent_loop_wait failed\n");
1535 torture_fail(test, "some error...\n");
1543 static void tevent_threaded_canceltest_do(void *private_data)
1545 struct tevent_threaded_canceltest_state *state =
1546 talloc_get_type_abort(private_data, struct tevent_threaded_canceltest_state);
1548 printf("[workerthread] %s\n", state->msg);
1551 * Sleep in the first sheduled job, so the second can be
1552 * cancelled before it gets sheduled
1556 talloc_free(state->msg);
1557 state->msg = talloc_asprintf(state, "job id %d done", state->id);
1558 if (state->msg == NULL) {
1565 static void tevent_threaded_canceltest_done(struct tevent_req *subreq)
1568 struct tevent_threaded_canceltest_state *state =
1569 tevent_req_callback_data(subreq, struct tevent_threaded_canceltest_state);
1571 printf("[mainthread] callback job %d\n", state->id);
1573 result = tevent_threadpool_recv(subreq, &error);
1575 if (error == EINTR) {
1576 printf("[mainthread] cancelled request\n");
1577 *state->torture_result = false;
1579 printf("[mainthread] recv failed!\n");
1582 printf("[mainthread] computation failed\n");
1583 *state->torture_result = false;
1588 if (state->id == 2) {
1589 printf("[mainthread] job 2's callback shouldn't be called!\n");
1590 *state->torture_result = false;
1593 TALLOC_FREE(subreq);
1598 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1600 struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1601 const char **list = tevent_backend_list(suite);
1604 for (i=0;list && list[i];i++) {
1605 struct torture_suite *backend_suite;
1607 backend_suite = torture_suite_create(mem_ctx, list[i]);
1609 torture_suite_add_simple_tcase_const(backend_suite,
1612 (const void *)list[i]);
1613 torture_suite_add_simple_tcase_const(backend_suite,
1616 (const void *)list[i]);
1617 torture_suite_add_simple_tcase_const(backend_suite,
1620 (const void *)list[i]);
1621 torture_suite_add_simple_tcase_const(
1622 backend_suite, "immediate_perf",
1623 test_event_context_immediate_perf,
1627 torture_suite_add_suite(suite, backend_suite);
1631 torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1632 test_event_context_threaded,
1635 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1636 test_multi_tevent_threaded,
1639 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1640 test_multi_tevent_threaded_1,
1643 torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1644 test_multi_tevent_threaded_2,
1647 torture_suite_add_simple_tcase_const(suite, "tevent_threaded_test",
1648 tevent_threaded_test,
1652 torture_suite_add_simple_tcase_const(suite, "tevent_threaded_canceltest",
1653 tevent_threaded_canceltest,
1660 struct torture_suite *torture_local_event_perf(TALLOC_CTX *mem_ctx)
1662 struct torture_suite *suite = torture_suite_create(
1663 mem_ctx, "event_perf");
1666 torture_suite_add_simple_tcase_const(suite, "immediate_perf",
1667 test_event_context_immediate_perf,