f0c9d55b75f367a13d8a9dc3b5210233650ce34e
[metze/samba/wip.git] / lib / tevent / testsuite.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    testing of the events subsystem
5
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #include "tevent.h"
29 #include "system/filesys.h"
30 #include "system/select.h"
31 #include "system/network.h"
32 #include "torture/torture.h"
33 #include "torture/local/proto.h"
34
35 #undef HAVE_PTHREAD
36
37 #ifdef HAVE_PTHREAD
38 #include <pthread.h>
39 #include <assert.h>
40 #endif
41
42 static int fde_count;
43
44 static void do_read(int fd, void *buf, size_t count)
45 {
46         ssize_t ret;
47
48         do {
49                 ret = read(fd, buf, count);
50         } while (ret == -1 && errno == EINTR);
51 }
52
53 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
54                         uint16_t flags, void *private_data)
55 {
56         int *fd = (int *)private_data;
57         char c;
58 #ifdef SA_SIGINFO
59         kill(getpid(), SIGUSR1);
60 #endif
61         kill(getpid(), SIGALRM);
62
63         do_read(fd[0], &c, 1);
64         fde_count++;
65 }
66
67 static void do_write(int fd, void *buf, size_t count)
68 {
69         ssize_t ret;
70
71         do {
72                 ret = write(fd, buf, count);
73         } while (ret == -1 && errno == EINTR);
74 }
75
76 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
77                         uint16_t flags, void *private_data)
78 {
79         int *fd = (int *)private_data;
80         char c = 0;
81
82         do_write(fd[1], &c, 1);
83 }
84
85
86 /* This will only fire if the fd's returned from pipe() are bi-directional. */
87 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
88                         uint16_t flags, void *private_data)
89 {
90         int *fd = (int *)private_data;
91         char c;
92 #ifdef SA_SIGINFO
93         kill(getpid(), SIGUSR1);
94 #endif
95         kill(getpid(), SIGALRM);
96
97         do_read(fd[1], &c, 1);
98         fde_count++;
99 }
100
101 /* This will only fire if the fd's returned from pipe() are bi-directional. */
102 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
103                         uint16_t flags, void *private_data)
104 {
105         int *fd = (int *)private_data;
106         char c = 0;
107         do_write(fd[0], &c, 1);
108 }
109
110 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
111                              struct timeval tval, void *private_data)
112 {
113         int *finished = (int *)private_data;
114         (*finished) = 1;
115 }
116
117 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
118                           int signum, int count, void *info, void *private_data)
119 {
120         int *countp = (int *)private_data;
121         (*countp) += count;
122 }
123
124 static bool test_event_context(struct torture_context *test,
125                                const void *test_data)
126 {
127         struct tevent_context *ev_ctx;
128         int fd[2] = { -1, -1 };
129         const char *backend = (const char *)test_data;
130         int alarm_count=0, info_count=0;
131         struct tevent_fd *fde_read;
132         struct tevent_fd *fde_read_1;
133         struct tevent_fd *fde_write;
134         struct tevent_fd *fde_write_1;
135 #ifdef SA_RESTART
136         struct tevent_signal *se1 = NULL;
137 #endif
138 #ifdef SA_RESETHAND
139         struct tevent_signal *se2 = NULL;
140 #endif
141 #ifdef SA_SIGINFO
142         struct tevent_signal *se3 = NULL;
143 #endif
144         int finished=0;
145         struct timeval t;
146         int ret;
147
148         ev_ctx = tevent_context_init_byname(test, backend);
149         if (ev_ctx == NULL) {
150                 torture_comment(test, "event backend '%s' not supported\n", backend);
151                 return true;
152         }
153
154         torture_comment(test, "backend '%s' - %s\n",
155                         backend, __FUNCTION__);
156
157         /* reset globals */
158         fde_count = 0;
159
160         /* create a pipe */
161         ret = pipe(fd);
162         torture_assert_int_equal(test, ret, 0, "pipe failed");
163
164         fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
165                             fde_handler_read, fd);
166         fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
167                             fde_handler_write_1, fd);
168
169         fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
170                             fde_handler_write, fd);
171         fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
172                             fde_handler_read_1, fd);
173
174         tevent_fd_set_auto_close(fde_read);
175         tevent_fd_set_auto_close(fde_write);
176
177         tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
178                          finished_handler, &finished);
179
180 #ifdef SA_RESTART
181         se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
182         torture_assert(test, se1 != NULL, "failed to setup se1");
183 #endif
184 #ifdef SA_RESETHAND
185         se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
186         torture_assert(test, se2 != NULL, "failed to setup se2");
187 #endif
188 #ifdef SA_SIGINFO
189         se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
190         torture_assert(test, se3 != NULL, "failed to setup se3");
191 #endif
192
193         t = timeval_current();
194         while (!finished) {
195                 errno = 0;
196                 if (tevent_loop_once(ev_ctx) == -1) {
197                         TALLOC_FREE(ev_ctx);
198                         torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
199                         return false;
200                 }
201         }
202
203         talloc_free(fde_read_1);
204         talloc_free(fde_write_1);
205         talloc_free(fde_read);
206         talloc_free(fde_write);
207
208         while (alarm_count < fde_count+1) {
209                 if (tevent_loop_once(ev_ctx) == -1) {
210                         break;
211                 }
212         }
213
214         torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
215
216 #ifdef SA_RESTART
217         talloc_free(se1);
218 #endif
219
220         torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
221
222 #ifdef SA_RESETHAND
223         /*
224          * we do not call talloc_free(se2)
225          * because it is already gone,
226          * after triggering the event handler.
227          */
228 #endif
229
230 #ifdef SA_SIGINFO
231         talloc_free(se3);
232         torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
233 #endif
234
235         talloc_free(ev_ctx);
236
237         return true;
238 }
239
240 struct test_event_fd1_state {
241         struct torture_context *tctx;
242         const char *backend;
243         struct tevent_context *ev;
244         int sock[2];
245         struct tevent_timer *te;
246         struct tevent_fd *fde0;
247         struct tevent_fd *fde1;
248         bool got_write;
249         bool got_read;
250         bool drain;
251         bool drain_done;
252         unsigned loop_count;
253         bool finished;
254         const char *error;
255 };
256
257 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
258                                        struct tevent_fd *fde,
259                                        uint16_t flags,
260                                        void *private_data)
261 {
262         struct test_event_fd1_state *state =
263                 (struct test_event_fd1_state *)private_data;
264
265         if (state->drain_done) {
266                 state->finished = true;
267                 state->error = __location__;
268                 return;
269         }
270
271         if (state->drain) {
272                 ssize_t ret;
273                 uint8_t c = 0;
274
275                 if (!(flags & TEVENT_FD_READ)) {
276                         state->finished = true;
277                         state->error = __location__;
278                         return;
279                 }
280
281                 ret = read(state->sock[0], &c, 1);
282                 if (ret == 1) {
283                         return;
284                 }
285
286                 /*
287                  * end of test...
288                  */
289                 tevent_fd_set_flags(fde, 0);
290                 state->drain_done = true;
291                 return;
292         }
293
294         if (!state->got_write) {
295                 uint8_t c = 0;
296
297                 if (flags != TEVENT_FD_WRITE) {
298                         state->finished = true;
299                         state->error = __location__;
300                         return;
301                 }
302                 state->got_write = true;
303
304                 /*
305                  * we write to the other socket...
306                  */
307                 do_write(state->sock[1], &c, 1);
308                 TEVENT_FD_NOT_WRITEABLE(fde);
309                 TEVENT_FD_READABLE(fde);
310                 return;
311         }
312
313         if (!state->got_read) {
314                 if (flags != TEVENT_FD_READ) {
315                         state->finished = true;
316                         state->error = __location__;
317                         return;
318                 }
319                 state->got_read = true;
320
321                 TEVENT_FD_NOT_READABLE(fde);
322                 return;
323         }
324
325         state->finished = true;
326         state->error = __location__;
327         return;
328 }
329
330 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
331                                     struct tevent_timer *te,
332                                     struct timeval tval,
333                                     void *private_data)
334 {
335         struct test_event_fd1_state *state =
336                 (struct test_event_fd1_state *)private_data;
337
338         if (state->drain_done) {
339                 state->finished = true;
340                 return;
341         }
342
343         if (!state->got_write) {
344                 state->finished = true;
345                 state->error = __location__;
346                 return;
347         }
348
349         if (!state->got_read) {
350                 state->finished = true;
351                 state->error = __location__;
352                 return;
353         }
354
355         state->loop_count++;
356         if (state->loop_count > 3) {
357                 state->finished = true;
358                 state->error = __location__;
359                 return;
360         }
361
362         state->got_write = false;
363         state->got_read = false;
364
365         tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
366
367         if (state->loop_count > 2) {
368                 state->drain = true;
369                 TALLOC_FREE(state->fde1);
370                 TEVENT_FD_READABLE(state->fde0);
371         }
372
373         state->te = tevent_add_timer(state->ev, state->ev,
374                                     timeval_current_ofs(0,2000),
375                                     test_event_fd1_finished, state);
376 }
377
378 static bool test_event_fd1(struct torture_context *tctx,
379                            const void *test_data)
380 {
381         struct test_event_fd1_state state;
382         int ret;
383
384         ZERO_STRUCT(state);
385         state.tctx = tctx;
386         state.backend = (const char *)test_data;
387
388         state.ev = tevent_context_init_byname(tctx, state.backend);
389         if (state.ev == NULL) {
390                 torture_skip(tctx, talloc_asprintf(tctx,
391                              "event backend '%s' not supported\n",
392                              state.backend));
393                 return true;
394         }
395
396         tevent_set_debug_stderr(state.ev);
397         torture_comment(tctx, "backend '%s' - %s\n",
398                         state.backend, __FUNCTION__);
399
400         /*
401          * This tests the following:
402          *
403          * It monitors the state of state.sock[0]
404          * with tevent_fd, but we never read/write on state.sock[0]
405          * while state.sock[1] * is only used to write a few bytes.
406          *
407          * We have a loop:
408          *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
409          *   - we write 1 byte to state.sock[1]
410          *   - we wait only for TEVENT_FD_READ on state.sock[0]
411          *   - we disable events on state.sock[0]
412          *   - the timer event restarts the loop
413          * Then we close state.sock[1]
414          * We have a loop:
415          *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
416          *   - we try to read 1 byte
417          *   - if the read gets an error of returns 0
418          *     we disable the event handler
419          *   - the timer finishes the test
420          */
421         state.sock[0] = -1;
422         state.sock[1] = -1;
423
424         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
425         torture_assert(tctx, ret == 0, "socketpair() failed");
426
427         state.te = tevent_add_timer(state.ev, state.ev,
428                                     timeval_current_ofs(0,1000),
429                                     test_event_fd1_finished, &state);
430         state.fde0 = tevent_add_fd(state.ev, state.ev,
431                                    state.sock[0], TEVENT_FD_WRITE,
432                                    test_event_fd1_fde_handler, &state);
433         /* state.fde1 is only used to auto close */
434         state.fde1 = tevent_add_fd(state.ev, state.ev,
435                                    state.sock[1], 0,
436                                    test_event_fd1_fde_handler, &state);
437
438         tevent_fd_set_auto_close(state.fde0);
439         tevent_fd_set_auto_close(state.fde1);
440
441         while (!state.finished) {
442                 errno = 0;
443                 if (tevent_loop_once(state.ev) == -1) {
444                         talloc_free(state.ev);
445                         torture_fail(tctx, talloc_asprintf(tctx,
446                                      "Failed event loop %s\n",
447                                      strerror(errno)));
448                 }
449         }
450
451         talloc_free(state.ev);
452
453         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
454                        "%s", state.error));
455
456         return true;
457 }
458
459 struct test_event_fd2_state {
460         struct torture_context *tctx;
461         const char *backend;
462         struct tevent_context *ev;
463         struct tevent_timer *te;
464         struct test_event_fd2_sock {
465                 struct test_event_fd2_state *state;
466                 int fd;
467                 struct tevent_fd *fde;
468                 size_t num_written;
469                 size_t num_read;
470                 bool got_full;
471         } sock0, sock1;
472         bool finished;
473         const char *error;
474 };
475
476 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
477                                         struct tevent_fd *fde,
478                                         uint16_t flags,
479                                         void *private_data)
480 {
481         struct test_event_fd2_sock *cur_sock =
482                 (struct test_event_fd2_sock *)private_data;
483         struct test_event_fd2_state *state = cur_sock->state;
484         struct test_event_fd2_sock *oth_sock = NULL;
485         uint8_t v = 0, c;
486         ssize_t ret;
487
488         if (cur_sock == &state->sock0) {
489                 oth_sock = &state->sock1;
490         } else {
491                 oth_sock = &state->sock0;
492         }
493
494         if (oth_sock->num_written == 1) {
495                 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
496                         state->finished = true;
497                         state->error = __location__;
498                         return;
499                 }
500         }
501
502         if (cur_sock->num_read == oth_sock->num_written) {
503                 state->finished = true;
504                 state->error = __location__;
505                 return;
506         }
507
508         if (!(flags & TEVENT_FD_READ)) {
509                 state->finished = true;
510                 state->error = __location__;
511                 return;
512         }
513
514         if (oth_sock->num_read >= PIPE_BUF) {
515                 /*
516                  * On Linux we become writable once we've read
517                  * one byte. On Solaris we only become writable
518                  * again once we've read 4096 bytes. PIPE_BUF
519                  * is probably a safe bet to test against.
520                  *
521                  * There should be room to write a byte again
522                  */
523                 if (!(flags & TEVENT_FD_WRITE)) {
524                         state->finished = true;
525                         state->error = __location__;
526                         return;
527                 }
528         }
529
530         if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
531                 v = (uint8_t)cur_sock->num_written;
532                 ret = write(cur_sock->fd, &v, 1);
533                 if (ret != 1) {
534                         state->finished = true;
535                         state->error = __location__;
536                         return;
537                 }
538                 cur_sock->num_written++;
539                 if (cur_sock->num_written > 0x80000000) {
540                         state->finished = true;
541                         state->error = __location__;
542                         return;
543                 }
544                 return;
545         }
546
547         if (!cur_sock->got_full) {
548                 cur_sock->got_full = true;
549
550                 if (!oth_sock->got_full) {
551                         /*
552                          * cur_sock is full,
553                          * lets wait for oth_sock
554                          * to be filled
555                          */
556                         tevent_fd_set_flags(cur_sock->fde, 0);
557                         return;
558                 }
559
560                 /*
561                  * oth_sock waited for cur_sock,
562                  * lets restart it
563                  */
564                 tevent_fd_set_flags(oth_sock->fde,
565                                     TEVENT_FD_READ|TEVENT_FD_WRITE);
566         }
567
568         ret = read(cur_sock->fd, &v, 1);
569         if (ret != 1) {
570                 state->finished = true;
571                 state->error = __location__;
572                 return;
573         }
574         c = (uint8_t)cur_sock->num_read;
575         if (c != v) {
576                 state->finished = true;
577                 state->error = __location__;
578                 return;
579         }
580         cur_sock->num_read++;
581
582         if (cur_sock->num_read < oth_sock->num_written) {
583                 /* there is more to read */
584                 return;
585         }
586         /*
587          * we read everything, we need to remove TEVENT_FD_WRITE
588          * to avoid spinning
589          */
590         TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
591
592         if (oth_sock->num_read == cur_sock->num_written) {
593                 /*
594                  * both directions are finished
595                  */
596                 state->finished = true;
597         }
598
599         return;
600 }
601
602 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
603                                     struct tevent_timer *te,
604                                     struct timeval tval,
605                                     void *private_data)
606 {
607         struct test_event_fd2_state *state =
608                 (struct test_event_fd2_state *)private_data;
609
610         /*
611          * this should never be triggered
612          */
613         state->finished = true;
614         state->error = __location__;
615 }
616
617 static bool test_event_fd2(struct torture_context *tctx,
618                            const void *test_data)
619 {
620         struct test_event_fd2_state state;
621         int sock[2];
622         uint8_t c = 0;
623
624         ZERO_STRUCT(state);
625         state.tctx = tctx;
626         state.backend = (const char *)test_data;
627
628         state.ev = tevent_context_init_byname(tctx, state.backend);
629         if (state.ev == NULL) {
630                 torture_skip(tctx, talloc_asprintf(tctx,
631                              "event backend '%s' not supported\n",
632                              state.backend));
633                 return true;
634         }
635
636         tevent_set_debug_stderr(state.ev);
637         torture_comment(tctx, "backend '%s' - %s\n",
638                         state.backend, __FUNCTION__);
639
640         /*
641          * This tests the following
642          *
643          * - We write 1 byte to each socket
644          * - We wait for TEVENT_FD_READ/WRITE on both sockets
645          * - When we get TEVENT_FD_WRITE we write 1 byte
646          *   until both socket buffers are full, which
647          *   means both sockets only get TEVENT_FD_READ.
648          * - Then we read 1 byte until we have consumed
649          *   all bytes the other end has written.
650          */
651         sock[0] = -1;
652         sock[1] = -1;
653         socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
654
655         /*
656          * the timer should never expire
657          */
658         state.te = tevent_add_timer(state.ev, state.ev,
659                                     timeval_current_ofs(600, 0),
660                                     test_event_fd2_finished, &state);
661         state.sock0.state = &state;
662         state.sock0.fd = sock[0];
663         state.sock0.fde = tevent_add_fd(state.ev, state.ev,
664                                         state.sock0.fd,
665                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
666                                         test_event_fd2_sock_handler,
667                                         &state.sock0);
668         state.sock1.state = &state;
669         state.sock1.fd = sock[1];
670         state.sock1.fde = tevent_add_fd(state.ev, state.ev,
671                                         state.sock1.fd,
672                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
673                                         test_event_fd2_sock_handler,
674                                         &state.sock1);
675
676         tevent_fd_set_auto_close(state.sock0.fde);
677         tevent_fd_set_auto_close(state.sock1.fde);
678
679         do_write(state.sock0.fd, &c, 1);
680         state.sock0.num_written++;
681         do_write(state.sock1.fd, &c, 1);
682         state.sock1.num_written++;
683
684         while (!state.finished) {
685                 errno = 0;
686                 if (tevent_loop_once(state.ev) == -1) {
687                         talloc_free(state.ev);
688                         torture_fail(tctx, talloc_asprintf(tctx,
689                                      "Failed event loop %s\n",
690                                      strerror(errno)));
691                 }
692         }
693
694         talloc_free(state.ev);
695
696         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
697                        "%s", state.error));
698
699         return true;
700 }
701
702 #ifdef HAVE_PTHREAD
703
704 struct immediate_perf_state {
705         pthread_mutex_t mutex;
706         struct timeval start;
707         uint64_t count;
708         bool done;
709 };
710
711 static void immediate_perf_handler(struct tevent_context *ev,
712                                    struct tevent_immediate *im,
713                                    void *private_data)
714 {
715         struct immediate_perf_state *state = private_data;
716         int ret;
717
718 #if 1
719         ret = pthread_mutex_lock(&state->mutex);
720         if (ret != 0) {
721                 abort();
722         }
723
724         state->count += 1;
725
726         ret = pthread_mutex_unlock(&state->mutex);
727         if (ret != 0) {
728                 abort();
729         }
730 #else
731         state->count += 1;
732 #endif
733         if ((state->count % 1000) == 0) {
734                 struct timeval now = tevent_timeval_current();
735                 struct timeval diff;
736                 diff = tevent_timeval_until(&state->start, &now);
737
738                 if (diff.tv_sec > 9) {
739                         state->done = true;
740                         return;
741                 }
742         }
743
744         tevent_schedule_immediate(im, ev, immediate_perf_handler, state);
745 }
746
747 static bool test_event_context_immediate_perf(struct torture_context *test,
748                                               const void *test_data)
749 {
750         const char *backend = test_data;
751         struct tevent_context *ev;
752         struct tevent_immediate *im;
753         struct immediate_perf_state state = {
754                 .start = tevent_timeval_current()
755         };
756         int ret;
757
758         ret = pthread_mutex_init(&state.mutex, NULL);
759         torture_assert(test, ret == 0, "pthread_mutex_init failed");
760
761         ev = tevent_context_init_byname(test, backend);
762         if (ev == NULL) {
763                 torture_comment(test, "event backend '%s' not supported\n",
764                                 backend);
765                 return true;
766         }
767
768         torture_comment(test, "backend '%s' - %s\n", backend, __FUNCTION__);
769
770         im = tevent_create_immediate(ev);
771         torture_assert(test, im != NULL, "tevent_create_immediate failed");
772
773         tevent_schedule_immediate(im, ev, immediate_perf_handler, &state);
774
775         while (!state.done) {
776                 ret = tevent_loop_once(ev);
777                 torture_assert(test, ret == 0, "tevent_loop_once failed");
778         }
779
780         torture_comment(test, "count=%"PRIu64"\n", state.count);
781
782         TALLOC_FREE(ev);
783         return true;
784 }
785
786 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
787 static bool do_shutdown = false;
788
789 static void test_event_threaded_lock(void)
790 {
791         int ret;
792         ret = pthread_mutex_lock(&threaded_mutex);
793         assert(ret == 0);
794 }
795
796 static void test_event_threaded_unlock(void)
797 {
798         int ret;
799         ret = pthread_mutex_unlock(&threaded_mutex);
800         assert(ret == 0);
801 }
802
803 static void test_event_threaded_trace(enum tevent_trace_point point,
804                                       void *private_data)
805 {
806         switch (point) {
807         case TEVENT_TRACE_BEFORE_WAIT:
808                 test_event_threaded_unlock();
809                 break;
810         case TEVENT_TRACE_AFTER_WAIT:
811                 test_event_threaded_lock();
812                 break;
813         case TEVENT_TRACE_BEFORE_LOOP_ONCE:
814         case TEVENT_TRACE_AFTER_LOOP_ONCE:
815                 break;
816         }
817 }
818
819 static void test_event_threaded_timer(struct tevent_context *ev,
820                                       struct tevent_timer *te,
821                                       struct timeval current_time,
822                                       void *private_data)
823 {
824         return;
825 }
826
827 static void *test_event_poll_thread(void *private_data)
828 {
829         struct tevent_context *ev = (struct tevent_context *)private_data;
830
831         test_event_threaded_lock();
832
833         while (true) {
834                 int ret;
835                 ret = tevent_loop_once(ev);
836                 assert(ret == 0);
837                 if (do_shutdown) {
838                         test_event_threaded_unlock();
839                         return NULL;
840                 }
841         }
842
843 }
844
845 static void test_event_threaded_read_handler(struct tevent_context *ev,
846                                              struct tevent_fd *fde,
847                                              uint16_t flags,
848                                              void *private_data)
849 {
850         int *pfd = (int *)private_data;
851         char c;
852         ssize_t nread;
853
854         if ((flags & TEVENT_FD_READ) == 0) {
855                 return;
856         }
857
858         do {
859                 nread = read(*pfd, &c, 1);
860         } while ((nread == -1) && (errno == EINTR));
861
862         assert(nread == 1);
863 }
864
865 static bool test_event_context_threaded(struct torture_context *test,
866                                         const void *test_data)
867 {
868         struct tevent_context *ev;
869         struct tevent_timer *te;
870         struct tevent_fd *fde;
871         pthread_t poll_thread;
872         int fds[2];
873         int ret;
874         char c = 0;
875
876         ev = tevent_context_init_byname(test, "poll_mt");
877         torture_assert(test, ev != NULL, "poll_mt not supported");
878
879         tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
880
881         te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
882                               test_event_threaded_timer, NULL);
883         torture_assert(test, te != NULL, "Could not add timer");
884
885         ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
886         torture_assert(test, ret == 0, "Could not create poll thread");
887
888         ret = pipe(fds);
889         torture_assert(test, ret == 0, "Could not create pipe");
890
891         poll(NULL, 0, 100);
892
893         test_event_threaded_lock();
894
895         fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
896                             test_event_threaded_read_handler, &fds[0]);
897         torture_assert(test, fde != NULL, "Could not add fd event");
898
899         test_event_threaded_unlock();
900
901         poll(NULL, 0, 100);
902
903         do_write(fds[1], &c, 1);
904
905         poll(NULL, 0, 100);
906
907         test_event_threaded_lock();
908         do_shutdown = true;
909         test_event_threaded_unlock();
910
911         do_write(fds[1], &c, 1);
912
913         ret = pthread_join(poll_thread, NULL);
914         torture_assert(test, ret == 0, "pthread_join failed");
915
916         return true;
917 }
918
919 #define NUM_TEVENT_THREADS 100
920
921 /* Ugly, but needed for torture_comment... */
922 static struct torture_context *thread_test_ctx;
923 static pthread_t thread_map[NUM_TEVENT_THREADS];
924 static unsigned thread_counter;
925
926 /* Called in master thread context */
927 static void callback_nowait(struct tevent_context *ev,
928                                 struct tevent_immediate *im,
929                                 void *private_ptr)
930 {
931         pthread_t *thread_id_ptr =
932                 talloc_get_type_abort(private_ptr, pthread_t);
933         unsigned i;
934
935         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
936                 if (pthread_equal(*thread_id_ptr,
937                                 thread_map[i])) {
938                         break;
939                 }
940         }
941         torture_comment(thread_test_ctx,
942                         "Callback %u from thread %u\n",
943                         thread_counter,
944                         i);
945         thread_counter++;
946 }
947
948 /* Blast the master tevent_context with a callback, no waiting. */
949 static void *thread_fn_nowait(void *private_ptr)
950 {
951         struct tevent_thread_proxy *master_tp =
952                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
953         struct tevent_immediate *im;
954         pthread_t *thread_id_ptr;
955
956         im = tevent_create_immediate(NULL);
957         if (im == NULL) {
958                 return NULL;
959         }
960         thread_id_ptr = talloc(NULL, pthread_t);
961         if (thread_id_ptr == NULL) {
962                 return NULL;
963         }
964         *thread_id_ptr = pthread_self();
965
966         tevent_thread_proxy_schedule(master_tp,
967                                 &im,
968                                 callback_nowait,
969                                 &thread_id_ptr);
970         return NULL;
971 }
972
973 static void timeout_fn(struct tevent_context *ev,
974                         struct tevent_timer *te,
975                         struct timeval tv, void *p)
976 {
977         thread_counter = NUM_TEVENT_THREADS * 10;
978 }
979
980 static bool test_multi_tevent_threaded(struct torture_context *test,
981                                         const void *test_data)
982 {
983         unsigned i;
984         struct tevent_context *master_ev;
985         struct tevent_thread_proxy *tp;
986
987         talloc_disable_null_tracking();
988
989         /* Ugly global stuff. */
990         thread_test_ctx = test;
991         thread_counter = 0;
992
993         master_ev = tevent_context_init(NULL);
994         if (master_ev == NULL) {
995                 return false;
996         }
997         tevent_set_debug_stderr(master_ev);
998
999         tp = tevent_thread_proxy_create(master_ev);
1000         if (tp == NULL) {
1001                 torture_fail(test,
1002                         talloc_asprintf(test,
1003                                 "tevent_thread_proxy_create failed\n"));
1004                 talloc_free(master_ev);
1005                 return false;
1006         }
1007
1008         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1009                 int ret = pthread_create(&thread_map[i],
1010                                 NULL,
1011                                 thread_fn_nowait,
1012                                 tp);
1013                 if (ret != 0) {
1014                         torture_fail(test,
1015                                 talloc_asprintf(test,
1016                                         "Failed to create thread %i, %d\n",
1017                                         i, ret));
1018                         return false;
1019                 }
1020         }
1021
1022         /* Ensure we don't wait more than 10 seconds. */
1023         tevent_add_timer(master_ev,
1024                         master_ev,
1025                         timeval_current_ofs(10,0),
1026                         timeout_fn,
1027                         NULL);
1028
1029         while (thread_counter < NUM_TEVENT_THREADS) {
1030                 int ret = tevent_loop_once(master_ev);
1031                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1032         }
1033
1034         torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1035                 "thread_counter fail\n");
1036
1037         talloc_free(master_ev);
1038         return true;
1039 }
1040
1041 struct reply_state {
1042         struct tevent_thread_proxy *reply_tp;
1043         pthread_t thread_id;
1044         int *p_finished;
1045 };
1046
1047 static void thread_timeout_fn(struct tevent_context *ev,
1048                         struct tevent_timer *te,
1049                         struct timeval tv, void *p)
1050 {
1051         int *p_finished = (int *)p;
1052
1053         *p_finished = 2;
1054 }
1055
1056 /* Called in child-thread context */
1057 static void thread_callback(struct tevent_context *ev,
1058                                 struct tevent_immediate *im,
1059                                 void *private_ptr)
1060 {
1061         struct reply_state *rsp =
1062                 talloc_get_type_abort(private_ptr, struct reply_state);
1063
1064         talloc_steal(ev, rsp);
1065         *rsp->p_finished = 1;
1066 }
1067
1068 /* Called in master thread context */
1069 static void master_callback(struct tevent_context *ev,
1070                                 struct tevent_immediate *im,
1071                                 void *private_ptr)
1072 {
1073         struct reply_state *rsp =
1074                 talloc_get_type_abort(private_ptr, struct reply_state);
1075         unsigned i;
1076
1077         talloc_steal(ev, rsp);
1078
1079         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1080                 if (pthread_equal(rsp->thread_id,
1081                                 thread_map[i])) {
1082                         break;
1083                 }
1084         }
1085         torture_comment(thread_test_ctx,
1086                         "Callback %u from thread %u\n",
1087                         thread_counter,
1088                         i);
1089         /* Now reply to the thread ! */
1090         tevent_thread_proxy_schedule(rsp->reply_tp,
1091                                 &im,
1092                                 thread_callback,
1093                                 &rsp);
1094
1095         thread_counter++;
1096 }
1097
1098 static void *thread_fn_1(void *private_ptr)
1099 {
1100         struct tevent_thread_proxy *master_tp =
1101                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1102         struct tevent_thread_proxy *tp;
1103         struct tevent_immediate *im;
1104         struct tevent_context *ev;
1105         struct reply_state *rsp;
1106         int finished = 0;
1107         int ret;
1108
1109         ev = tevent_context_init(NULL);
1110         if (ev == NULL) {
1111                 return NULL;
1112         }
1113
1114         tp = tevent_thread_proxy_create(ev);
1115         if (tp == NULL) {
1116                 talloc_free(ev);
1117                 return NULL;
1118         }
1119
1120         im = tevent_create_immediate(ev);
1121         if (im == NULL) {
1122                 talloc_free(ev);
1123                 return NULL;
1124         }
1125
1126         rsp = talloc(ev, struct reply_state);
1127         if (rsp == NULL) {
1128                 talloc_free(ev);
1129                 return NULL;
1130         }
1131
1132         rsp->thread_id = pthread_self();
1133         rsp->reply_tp = tp;
1134         rsp->p_finished = &finished;
1135
1136         /* Introduce a little randomness into the mix.. */
1137         usleep(random() % 7000);
1138
1139         tevent_thread_proxy_schedule(master_tp,
1140                                 &im,
1141                                 master_callback,
1142                                 &rsp);
1143
1144         /* Ensure we don't wait more than 10 seconds. */
1145         tevent_add_timer(ev,
1146                         ev,
1147                         timeval_current_ofs(10,0),
1148                         thread_timeout_fn,
1149                         &finished);
1150
1151         while (finished == 0) {
1152                 ret = tevent_loop_once(ev);
1153                 assert(ret == 0);
1154         }
1155
1156         if (finished > 1) {
1157                 /* Timeout ! */
1158                 abort();
1159         }
1160
1161         /*
1162          * NB. We should talloc_free(ev) here, but if we do
1163          * we currently get hit by helgrind Fix #323432
1164          * "When calling pthread_cond_destroy or pthread_mutex_destroy
1165          * with initializers as argument Helgrind (incorrectly) reports errors."
1166          *
1167          * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1168          * with-pthread-mutex-destroy-td47757.html
1169          *
1170          * Helgrind doesn't understand that the request/reply
1171          * messages provide synchronization between the lock/unlock
1172          * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1173          * when the struct tevent_thread_proxy object is talloc_free'd.
1174          *
1175          * As a work-around for now return ev for the parent thread to free.
1176          */
1177         return ev;
1178 }
1179
1180 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1181                                         const void *test_data)
1182 {
1183         unsigned i;
1184         struct tevent_context *master_ev;
1185         struct tevent_thread_proxy *master_tp;
1186         int ret;
1187
1188         talloc_disable_null_tracking();
1189
1190         /* Ugly global stuff. */
1191         thread_test_ctx = test;
1192         thread_counter = 0;
1193
1194         master_ev = tevent_context_init(NULL);
1195         if (master_ev == NULL) {
1196                 return false;
1197         }
1198         tevent_set_debug_stderr(master_ev);
1199
1200         master_tp = tevent_thread_proxy_create(master_ev);
1201         if (master_tp == NULL) {
1202                 torture_fail(test,
1203                         talloc_asprintf(test,
1204                                 "tevent_thread_proxy_create failed\n"));
1205                 talloc_free(master_ev);
1206                 return false;
1207         }
1208
1209         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1210                 ret = pthread_create(&thread_map[i],
1211                                 NULL,
1212                                 thread_fn_1,
1213                                 master_tp);
1214                 if (ret != 0) {
1215                         torture_fail(test,
1216                                 talloc_asprintf(test,
1217                                         "Failed to create thread %i, %d\n",
1218                                         i, ret));
1219                                 return false;
1220                 }
1221         }
1222
1223         while (thread_counter < NUM_TEVENT_THREADS) {
1224                 ret = tevent_loop_once(master_ev);
1225                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1226         }
1227
1228         /* Wait for all the threads to finish - join 'em. */
1229         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1230                 void *retval;
1231                 ret = pthread_join(thread_map[i], &retval);
1232                 torture_assert(test, ret == 0, "pthread_join failed");
1233                 /* Free the child thread event context. */
1234                 talloc_free(retval);
1235         }
1236
1237         talloc_free(master_ev);
1238         return true;
1239 }
1240
1241 struct threaded_test_2 {
1242         struct tevent_threaded_context *tctx;
1243         struct tevent_immediate *im;
1244         pthread_t thread_id;
1245 };
1246
1247 static void master_callback_2(struct tevent_context *ev,
1248                               struct tevent_immediate *im,
1249                               void *private_data);
1250
1251 static void *thread_fn_2(void *private_data)
1252 {
1253         struct threaded_test_2 *state = private_data;
1254
1255         state->thread_id = pthread_self();
1256
1257         usleep(random() % 7000);
1258
1259         tevent_threaded_schedule_immediate(
1260                 state->tctx, state->im, master_callback_2, state);
1261
1262         return NULL;
1263 }
1264
1265 static void master_callback_2(struct tevent_context *ev,
1266                               struct tevent_immediate *im,
1267                               void *private_data)
1268 {
1269         struct threaded_test_2 *state = private_data;
1270         int i;
1271
1272         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1273                 if (pthread_equal(state->thread_id, thread_map[i])) {
1274                         break;
1275                 }
1276         }
1277         torture_comment(thread_test_ctx,
1278                         "Callback_2 %u from thread %u\n",
1279                         thread_counter,
1280                         i);
1281         thread_counter++;
1282 }
1283
1284 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1285                                          const void *test_data)
1286 {
1287         unsigned i;
1288
1289         struct tevent_context *ev;
1290         struct tevent_threaded_context *tctx;
1291         int ret;
1292
1293         thread_test_ctx = test;
1294         thread_counter = 0;
1295
1296         ev = tevent_context_init(test);
1297         torture_assert(test, ev != NULL, "tevent_context_init failed");
1298
1299         /*
1300          * tevent_re_initialise used to have a bug where it did not
1301          * re-initialise the thread support after taking it
1302          * down. Excercise that code path.
1303          */
1304         ret = tevent_re_initialise(ev);
1305         torture_assert(test, ret == 0, "tevent_re_initialise failed");
1306
1307         tctx = tevent_threaded_context_create(ev, ev);
1308         torture_assert(test, tctx != NULL,
1309                        "tevent_threaded_context_create failed");
1310
1311         for (i=0; i<NUM_TEVENT_THREADS; i++) {
1312                 struct threaded_test_2 *state;
1313
1314                 state = talloc(ev, struct threaded_test_2);
1315                 torture_assert(test, state != NULL, "talloc failed");
1316
1317                 state->tctx = tctx;
1318                 state->im = tevent_create_immediate(state);
1319                 torture_assert(test, state->im != NULL,
1320                                "tevent_create_immediate failed");
1321
1322                 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1323                 torture_assert(test, ret == 0, "pthread_create failed");
1324         }
1325
1326         while (thread_counter < NUM_TEVENT_THREADS) {
1327                 ret = tevent_loop_once(ev);
1328                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1329         }
1330
1331         /* Wait for all the threads to finish - join 'em. */
1332         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1333                 void *retval;
1334                 ret = pthread_join(thread_map[i], &retval);
1335                 torture_assert(test, ret == 0, "pthread_join failed");
1336                 /* Free the child thread event context. */
1337         }
1338
1339         talloc_free(tctx);
1340         talloc_free(ev);
1341         return true;
1342 }
1343
1344 struct tevent_threaded_test_state {
1345         bool ok;
1346         int id;
1347         char *msg;
1348 };
1349
1350 static void tevent_threaded_test_do(void *private_data);
1351 static void tevent_threaded_test_done(struct tevent_req *subreq);
1352
1353 static bool tevent_threaded_test(struct torture_context *test,
1354                                  const void *test_data)
1355 {
1356         int result;
1357         bool ok;
1358         struct tevent_context *ev;
1359         struct tevent_threadpool *pool;
1360         struct tevent_threaded_test_state *state;
1361 #define NUMREQ 10
1362         struct tevent_req *subreq[NUMREQ];
1363         int i;
1364
1365         talloc_disable_null_tracking();
1366
1367         ev = tevent_context_init(NULL);
1368         torture_assert_goto(test, ev != NULL, ok, done,
1369                             "tevent_context_init failed\n");
1370
1371         tevent_set_debug_stderr(ev);
1372
1373         pool = tevent_threadpool_create(ev, ev, 10);
1374         torture_assert_goto(test, pool != NULL, ok, done,
1375                             "tevent_threaded_init failed\n");
1376
1377         /*
1378          * Starting at 1 helps debugging as the threadpool uses job
1379          * ids starting at 1
1380          */
1381         for (i = 1; i <= NUMREQ; i++) {
1382 #undef NUMREQ
1383                 printf("[mainthread] Sending request %d\n", i);
1384
1385                 state = talloc_zero(NULL, struct tevent_threaded_test_state);
1386                 torture_assert_goto(test, state != NULL, ok, done,
1387                                     "talloc_zero failed\n");
1388
1389                 state->id = i;
1390                 state->msg = talloc_asprintf(state, "request %d", state->id);
1391                 torture_assert_goto(test, state->msg != NULL, ok, done,
1392                                     "talloc_asprintf failed\n");
1393
1394                 subreq[i-1] = tevent_threadpool_send(pool,
1395                                                      tevent_threaded_test_do,
1396                                                      state);
1397                 torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
1398                                     "tevent_threadpool_send failed\n");
1399                 tevent_req_set_callback(subreq[i-1], tevent_threaded_test_done, state);
1400         }
1401
1402         result = tevent_loop_wait(ev);
1403         torture_assert_goto(test, result == 0, ok, done,
1404                             "tevent_loop_wait failed\n");
1405
1406         ok = true;
1407
1408 done:
1409         talloc_free(ev);
1410         return ok;
1411 }
1412
1413 static void tevent_threaded_test_do(void *private_data)
1414 {
1415         struct tevent_threaded_test_state *state =
1416                 talloc_get_type_abort(private_data, struct tevent_threaded_test_state);
1417
1418         printf("[workerthread] job id: %d, msg: %s\n", state->id, state->msg);
1419         talloc_free(state->msg);
1420
1421         state->msg = talloc_asprintf(state, "Thread %d responding", state->id);
1422         if (state->msg == NULL) {
1423                 state->ok = false;
1424                 return;
1425         }
1426         state->ok = true;
1427 }
1428
1429 static void tevent_threaded_test_done(struct tevent_req *subreq)
1430 {
1431         int result, error;
1432         struct tevent_threaded_test_state *state =
1433                 tevent_req_callback_data(subreq, struct tevent_threaded_test_state);
1434
1435         printf("[mainthread] callback: %d done, %s\n", state->id, state->msg);
1436
1437         result = tevent_threadpool_recv(subreq, &error);
1438         if (result != 0) {
1439                 printf("[mainthread] recv failed!\n");
1440         }
1441         if (!state->ok) {
1442                 printf("[mainthread] failed!\n");
1443         }
1444
1445         TALLOC_FREE(subreq);
1446         TALLOC_FREE(state);
1447 }
1448
1449 struct tevent_threaded_canceltest_state {
1450         struct torture_context *tctx;
1451         bool *torture_result;
1452         bool ok;
1453         int id;
1454         char *msg;
1455 };
1456
1457 static void tevent_threaded_canceltest_do(void *private_data);
1458 static void tevent_threaded_canceltest_done(struct tevent_req *subreq);
1459
1460 static bool tevent_threaded_canceltest(struct torture_context *test,
1461                                        const void *test_data)
1462 {
1463         int result;
1464         bool ok = true;
1465         struct tevent_context *ev;
1466         struct tevent_threadpool *pool;
1467         struct tevent_threaded_canceltest_state *state;
1468         struct tevent_req *subreq[3];
1469         int i;
1470
1471         talloc_disable_null_tracking();
1472
1473         ev = tevent_context_init(NULL);
1474         torture_assert_goto(test, ev != NULL, ok, done,
1475                             "tevent_context_init failed\n");
1476
1477         tevent_set_debug_stderr(ev);
1478
1479         pool = tevent_threadpool_create(ev, ev, 1);
1480         torture_assert_goto(test, pool != NULL, ok, done,
1481                             "tevent_threaded_init failed\n");
1482
1483         for (i = 1; i <= 2; i++) {
1484                 printf("[mainthread] Sending request %d\n", i);
1485
1486                 state = talloc_zero(pool, struct tevent_threaded_canceltest_state);
1487                 torture_assert_goto(test, state != NULL, ok, done,
1488                                     "talloc_zero failed\n");
1489
1490                 state->id = i;
1491                 state->tctx = test;
1492                 state->torture_result = &ok;
1493                 state->msg = talloc_asprintf(state, "request %d", state->id);
1494                 torture_assert_goto(test, state->msg != NULL, ok, done,
1495                                     "talloc_asprintf failed\n");
1496
1497                 subreq[i-1] = tevent_threadpool_send(pool,
1498                                                      tevent_threaded_canceltest_do,
1499                                                      state);
1500                 torture_assert_goto(test, subreq[i-1] != NULL, ok, done,
1501                                     "tevent_threadpool_send failed\n");
1502                 tevent_req_set_callback(subreq[i-1], tevent_threaded_canceltest_done, state);
1503         }
1504
1505         TALLOC_FREE(subreq[1]);
1506
1507         result = tevent_loop_wait(ev);
1508         torture_assert_goto(test, result == 0, ok, done,
1509                             "tevent_loop_wait failed\n");
1510
1511         printf("sleeping...\n");
1512         sleep(5);
1513
1514         /*
1515          * Now test reusing state of the request we cancelled via
1516          * TALLOC_FREE()
1517          */
1518         state->id = 3;
1519         talloc_free(state->msg);
1520         state->msg = talloc_asprintf(state, "request %d", state->id);
1521         torture_assert_goto(test, state->msg != NULL, ok, done,
1522                             "talloc_asprintf failed\n");
1523         subreq[2] = tevent_threadpool_send(pool,
1524                                            tevent_threaded_canceltest_do,
1525                                            state);
1526         torture_assert_goto(test, subreq[2] != NULL, ok, done,
1527                             "tevent_threadpool_send failed\n");
1528         tevent_req_set_callback(subreq[2], tevent_threaded_canceltest_done, state);
1529
1530         result = tevent_loop_wait(ev);
1531         torture_assert_goto(test, result == 0, ok, done,
1532                             "tevent_loop_wait failed\n");
1533
1534         if (!ok) {
1535                 torture_fail(test, "some error...\n");
1536         }
1537
1538 done:
1539         talloc_free(ev);
1540         return ok;
1541 }
1542
1543 static void tevent_threaded_canceltest_do(void *private_data)
1544 {
1545         struct tevent_threaded_canceltest_state *state =
1546                 talloc_get_type_abort(private_data, struct tevent_threaded_canceltest_state);
1547
1548         printf("[workerthread] %s\n", state->msg);
1549
1550         /*
1551          * Sleep in the first sheduled job, so the second can be
1552          * cancelled before it gets sheduled
1553          */
1554         sleep(1);
1555
1556         talloc_free(state->msg);
1557         state->msg = talloc_asprintf(state, "job id %d done", state->id);
1558         if (state->msg == NULL) {
1559                 state->ok = false;
1560                 return;
1561         }
1562         state->ok = true;
1563 }
1564
1565 static void tevent_threaded_canceltest_done(struct tevent_req *subreq)
1566 {
1567         int result, error;
1568         struct tevent_threaded_canceltest_state *state =
1569                 tevent_req_callback_data(subreq, struct tevent_threaded_canceltest_state);
1570
1571         printf("[mainthread] callback job %d\n", state->id);
1572
1573         result = tevent_threadpool_recv(subreq, &error);
1574         if (result != 0) {
1575                 if (error == EINTR) {
1576                         printf("[mainthread] cancelled request\n");
1577                         *state->torture_result = false;
1578                 }
1579                 printf("[mainthread] recv failed!\n");
1580         }
1581         if (!state->ok) {
1582                 printf("[mainthread] computation failed\n");
1583                 *state->torture_result = false;
1584         } else {
1585                 printf(state->msg);
1586         }
1587
1588         if (state->id == 2) {
1589                 printf("[mainthread] job 2's callback shouldn't be called!\n");
1590                 *state->torture_result = false;
1591         }
1592
1593         TALLOC_FREE(subreq);
1594         TALLOC_FREE(state);
1595 }
1596 #endif
1597
1598 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1599 {
1600         struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1601         const char **list = tevent_backend_list(suite);
1602         int i;
1603
1604         for (i=0;list && list[i];i++) {
1605                 struct torture_suite *backend_suite;
1606
1607                 backend_suite = torture_suite_create(mem_ctx, list[i]);
1608
1609                 torture_suite_add_simple_tcase_const(backend_suite,
1610                                                "context",
1611                                                test_event_context,
1612                                                (const void *)list[i]);
1613                 torture_suite_add_simple_tcase_const(backend_suite,
1614                                                "fd1",
1615                                                test_event_fd1,
1616                                                (const void *)list[i]);
1617                 torture_suite_add_simple_tcase_const(backend_suite,
1618                                                "fd2",
1619                                                test_event_fd2,
1620                                                (const void *)list[i]);
1621                 torture_suite_add_simple_tcase_const(
1622                         backend_suite, "immediate_perf",
1623                         test_event_context_immediate_perf,
1624                         list[i]);
1625
1626
1627                 torture_suite_add_suite(suite, backend_suite);
1628         }
1629
1630 #ifdef HAVE_PTHREAD
1631         torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1632                                              test_event_context_threaded,
1633                                              NULL);
1634
1635         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1636                                              test_multi_tevent_threaded,
1637                                              NULL);
1638
1639         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1640                                              test_multi_tevent_threaded_1,
1641                                              NULL);
1642
1643         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1644                                              test_multi_tevent_threaded_2,
1645                                              NULL);
1646
1647         torture_suite_add_simple_tcase_const(suite, "tevent_threaded_test",
1648                                              tevent_threaded_test,
1649                                              NULL);
1650         }
1651
1652         torture_suite_add_simple_tcase_const(suite, "tevent_threaded_canceltest",
1653                                              tevent_threaded_canceltest,
1654                                              NULL);
1655 #endif
1656
1657         return suite;
1658 }
1659
1660 struct torture_suite *torture_local_event_perf(TALLOC_CTX *mem_ctx)
1661 {
1662         struct torture_suite *suite = torture_suite_create(
1663                 mem_ctx, "event_perf");
1664
1665 #ifdef HAVE_PTHREAD
1666         torture_suite_add_simple_tcase_const(suite, "immediate_perf",
1667                                              test_event_context_immediate_perf,
1668                                              NULL);
1669 #endif
1670
1671         return suite;
1672 }