Revert "test_event_fd1 error only on read"
[metze/samba/wip.git] / lib / tevent / testsuite.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    testing of the events subsystem
5
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #include "lib/tevent/tevent.h"
29 #include "system/filesys.h"
30 #include "system/select.h"
31 #include "system/network.h"
32 #include "torture/torture.h"
33 #include "torture/local/proto.h"
34 #ifdef HAVE_PTHREAD
35 #include <pthread.h>
36 #include <assert.h>
37 #endif
38
39 static int fde_count;
40
41 static void do_read(int fd, void *buf, size_t count)
42 {
43         ssize_t ret;
44
45         do {
46                 ret = read(fd, buf, count);
47         } while (ret == -1 && errno == EINTR);
48 }
49
50 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
51                         uint16_t flags, void *private_data)
52 {
53         int *fd = (int *)private_data;
54         char c;
55 #ifdef SA_SIGINFO
56         kill(getpid(), SIGUSR1);
57 #endif
58         kill(getpid(), SIGALRM);
59
60         do_read(fd[0], &c, 1);
61         fde_count++;
62 }
63
64 static void do_write(int fd, void *buf, size_t count)
65 {
66         ssize_t ret;
67
68         do {
69                 ret = write(fd, buf, count);
70         } while (ret == -1 && errno == EINTR);
71 }
72
73 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
74                         uint16_t flags, void *private_data)
75 {
76         int *fd = (int *)private_data;
77         char c = 0;
78
79         do_write(fd[1], &c, 1);
80 }
81
82
83 /* This will only fire if the fd's returned from pipe() are bi-directional. */
84 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
85                         uint16_t flags, void *private_data)
86 {
87         int *fd = (int *)private_data;
88         char c;
89 #ifdef SA_SIGINFO
90         kill(getpid(), SIGUSR1);
91 #endif
92         kill(getpid(), SIGALRM);
93
94         do_read(fd[1], &c, 1);
95         fde_count++;
96 }
97
98 /* This will only fire if the fd's returned from pipe() are bi-directional. */
99 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
100                         uint16_t flags, void *private_data)
101 {
102         int *fd = (int *)private_data;
103         char c = 0;
104         do_write(fd[0], &c, 1);
105 }
106
107 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
108                              struct timeval tval, void *private_data)
109 {
110         int *finished = (int *)private_data;
111         (*finished) = 1;
112 }
113
114 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
115                           int signum, int count, void *info, void *private_data)
116 {
117         int *countp = (int *)private_data;
118         (*countp) += count;
119 }
120
121 static bool test_event_context(struct torture_context *test,
122                                const void *test_data)
123 {
124         struct tevent_context *ev_ctx;
125         int fd[2] = { -1, -1 };
126         const char *backend = (const char *)test_data;
127         int alarm_count=0, info_count=0;
128         struct tevent_fd *fde_read;
129         struct tevent_fd *fde_read_1;
130         struct tevent_fd *fde_write;
131         struct tevent_fd *fde_write_1;
132 #ifdef SA_RESTART
133         struct tevent_signal *se1 = NULL;
134 #endif
135 #ifdef SA_RESETHAND
136         struct tevent_signal *se2 = NULL;
137 #endif
138 #ifdef SA_SIGINFO
139         struct tevent_signal *se3 = NULL;
140 #endif
141         int finished=0;
142         struct timeval t;
143         int ret;
144
145         ev_ctx = tevent_context_init_byname(test, backend);
146         if (ev_ctx == NULL) {
147                 torture_comment(test, "event backend '%s' not supported\n", backend);
148                 return true;
149         }
150
151         torture_comment(test, "backend '%s' - %s\n",
152                         backend, __FUNCTION__);
153
154         /* reset globals */
155         fde_count = 0;
156
157         /* create a pipe */
158         ret = pipe(fd);
159         torture_assert_int_equal(test, ret, 0, "pipe failed");
160
161         fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
162                             fde_handler_read, fd);
163         fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
164                             fde_handler_write_1, fd);
165
166         fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
167                             fde_handler_write, fd);
168         fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
169                             fde_handler_read_1, fd);
170
171         tevent_fd_set_auto_close(fde_read);
172         tevent_fd_set_auto_close(fde_write);
173
174         tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
175                          finished_handler, &finished);
176
177 #ifdef SA_RESTART
178         se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
179         torture_assert(test, se1 != NULL, "failed to setup se1");
180 #endif
181 #ifdef SA_RESETHAND
182         se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
183         torture_assert(test, se2 != NULL, "failed to setup se2");
184 #endif
185 #ifdef SA_SIGINFO
186         se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
187         torture_assert(test, se3 != NULL, "failed to setup se3");
188 #endif
189
190         t = timeval_current();
191         while (!finished) {
192                 errno = 0;
193                 if (tevent_loop_once(ev_ctx) == -1) {
194                         talloc_free(ev_ctx);
195                         torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
196                 }
197         }
198
199         talloc_free(fde_read_1);
200         talloc_free(fde_write_1);
201         talloc_free(fde_read);
202         talloc_free(fde_write);
203
204         while (alarm_count < fde_count+1) {
205                 if (tevent_loop_once(ev_ctx) == -1) {
206                         break;
207                 }
208         }
209
210         torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
211
212 #ifdef SA_RESTART
213         talloc_free(se1);
214 #endif
215
216         torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
217
218 #ifdef SA_RESETHAND
219         /*
220          * we do not call talloc_free(se2)
221          * because it is already gone,
222          * after triggering the event handler.
223          */
224 #endif
225
226 #ifdef SA_SIGINFO
227         talloc_free(se3);
228         torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
229 #endif
230
231         talloc_free(ev_ctx);
232
233         return true;
234 }
235
236 struct test_event_fd1_state {
237         struct torture_context *tctx;
238         const char *backend;
239         struct tevent_context *ev;
240         int sock[2];
241         struct tevent_timer *te;
242         struct tevent_fd *fde0;
243         struct tevent_fd *fde1;
244         bool got_write;
245         bool got_read;
246         bool drain;
247         bool drain_done;
248         unsigned loop_count;
249         bool finished;
250         const char *error;
251 };
252
253 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
254                                        struct tevent_fd *fde,
255                                        uint16_t flags,
256                                        void *private_data)
257 {
258         struct test_event_fd1_state *state =
259                 (struct test_event_fd1_state *)private_data;
260
261         if (state->drain_done) {
262                 state->finished = true;
263                 state->error = __location__;
264                 return;
265         }
266
267         if (state->drain) {
268                 ssize_t ret;
269                 uint8_t c = 0;
270
271                 if (!(flags & TEVENT_FD_READ)) {
272                         state->finished = true;
273                         state->error = __location__;
274                         return;
275                 }
276
277                 ret = read(state->sock[0], &c, 1);
278                 if (ret == 1) {
279                         return;
280                 }
281
282                 /*
283                  * end of test...
284                  */
285                 tevent_fd_set_flags(fde, 0);
286                 state->drain_done = true;
287                 return;
288         }
289
290         if (!state->got_write) {
291                 uint8_t c = 0;
292
293                 if (flags != TEVENT_FD_WRITE) {
294                         state->finished = true;
295                         state->error = __location__;
296                         return;
297                 }
298                 state->got_write = true;
299
300                 /*
301                  * we write to the other socket...
302                  */
303                 do_write(state->sock[1], &c, 1);
304                 TEVENT_FD_NOT_WRITEABLE(fde);
305                 TEVENT_FD_READABLE(fde);
306                 return;
307         }
308
309         if (!state->got_read) {
310                 if (flags != TEVENT_FD_READ) {
311                         state->finished = true;
312                         state->error = __location__;
313                         return;
314                 }
315                 state->got_read = true;
316
317                 TEVENT_FD_NOT_READABLE(fde);
318                 return;
319         }
320
321         state->finished = true;
322         state->error = __location__;
323         return;
324 }
325
326 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
327                                     struct tevent_timer *te,
328                                     struct timeval tval,
329                                     void *private_data)
330 {
331         struct test_event_fd1_state *state =
332                 (struct test_event_fd1_state *)private_data;
333
334         if (state->drain_done) {
335                 state->finished = true;
336                 return;
337         }
338
339         if (!state->got_write) {
340                 state->finished = true;
341                 state->error = __location__;
342                 return;
343         }
344
345         if (!state->got_read) {
346                 state->finished = true;
347                 state->error = __location__;
348                 return;
349         }
350
351         state->loop_count++;
352         if (state->loop_count > 3) {
353                 state->finished = true;
354                 state->error = __location__;
355                 return;
356         }
357
358         state->got_write = false;
359         state->got_read = false;
360
361         tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
362
363         if (state->loop_count > 2) {
364                 state->drain = true;
365                 TALLOC_FREE(state->fde1);
366                 TEVENT_FD_READABLE(state->fde0);
367         }
368
369         state->te = tevent_add_timer(state->ev, state->ev,
370                                     timeval_current_ofs(0,2000),
371                                     test_event_fd1_finished, state);
372 }
373
374 static bool test_event_fd1(struct torture_context *tctx,
375                            const void *test_data)
376 {
377         struct test_event_fd1_state state;
378
379         ZERO_STRUCT(state);
380         state.tctx = tctx;
381         state.backend = (const char *)test_data;
382
383         state.ev = tevent_context_init_byname(tctx, state.backend);
384         if (state.ev == NULL) {
385                 torture_skip(tctx, talloc_asprintf(tctx,
386                              "event backend '%s' not supported\n",
387                              state.backend));
388                 return true;
389         }
390
391         tevent_set_debug_stderr(state.ev);
392         torture_comment(tctx, "backend '%s' - %s\n",
393                         state.backend, __FUNCTION__);
394
395         /*
396          * This tests the following:
397          *
398          * It monitors the state of state.sock[0]
399          * with tevent_fd, but we never read/write on state.sock[0]
400          * while state.sock[1] * is only used to write a few bytes.
401          *
402          * We have a loop:
403          *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
404          *   - we write 1 byte to state.sock[1]
405          *   - we wait only for TEVENT_FD_READ on state.sock[0]
406          *   - we disable events on state.sock[0]
407          *   - the timer event restarts the loop
408          * Then we close state.sock[1]
409          * We have a loop:
410          *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
411          *   - we try to read 1 byte
412          *   - if the read gets an error of returns 0
413          *     we disable the event handler
414          *   - the timer finishes the test
415          */
416         state.sock[0] = -1;
417         state.sock[1] = -1;
418         socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
419
420         state.te = tevent_add_timer(state.ev, state.ev,
421                                     timeval_current_ofs(0,1000),
422                                     test_event_fd1_finished, &state);
423         state.fde0 = tevent_add_fd(state.ev, state.ev,
424                                    state.sock[0], TEVENT_FD_WRITE,
425                                    test_event_fd1_fde_handler, &state);
426         /* state.fde1 is only used to auto close */
427         state.fde1 = tevent_add_fd(state.ev, state.ev,
428                                    state.sock[1], 0,
429                                    test_event_fd1_fde_handler, &state);
430
431         tevent_fd_set_auto_close(state.fde0);
432         tevent_fd_set_auto_close(state.fde1);
433
434         while (!state.finished) {
435                 errno = 0;
436                 if (tevent_loop_once(state.ev) == -1) {
437                         talloc_free(state.ev);
438                         torture_fail(tctx, talloc_asprintf(tctx,
439                                      "Failed event loop %s\n",
440                                      strerror(errno)));
441                 }
442         }
443
444         talloc_free(state.ev);
445
446         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
447                        "%s", state.error));
448
449         return true;
450 }
451
452 struct test_event_fd2_state {
453         struct torture_context *tctx;
454         const char *backend;
455         struct tevent_context *ev;
456         struct tevent_timer *te;
457         struct test_event_fd2_sock {
458                 struct test_event_fd2_state *state;
459                 int fd;
460                 struct tevent_fd *fde;
461                 size_t num_written;
462                 size_t num_read;
463                 bool got_full;
464         } sock0, sock1;
465         bool finished;
466         const char *error;
467 };
468
469 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
470                                         struct tevent_fd *fde,
471                                         uint16_t flags,
472                                         void *private_data)
473 {
474         struct test_event_fd2_sock *cur_sock =
475                 (struct test_event_fd2_sock *)private_data;
476         struct test_event_fd2_state *state = cur_sock->state;
477         struct test_event_fd2_sock *oth_sock = NULL;
478         uint8_t v = 0, c;
479         ssize_t ret;
480
481         if (cur_sock == &state->sock0) {
482                 oth_sock = &state->sock1;
483         } else {
484                 oth_sock = &state->sock0;
485         }
486
487         if (oth_sock->num_written == 1) {
488                 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
489                         state->finished = true;
490                         state->error = __location__;
491                         return;
492                 }
493         }
494
495         if (cur_sock->num_read == oth_sock->num_written) {
496                 state->finished = true;
497                 state->error = __location__;
498                 return;
499         }
500
501         if (!(flags & TEVENT_FD_READ)) {
502                 state->finished = true;
503                 state->error = __location__;
504                 return;
505         }
506
507         if (oth_sock->num_read >= PIPE_BUF) {
508                 /*
509                  * On Linux we become writable once we've read
510                  * one byte. On Solaris we only become writable
511                  * again once we've read 4096 bytes. PIPE_BUF
512                  * is probably a safe bet to test against.
513                  *
514                  * There should be room to write a byte again
515                  */
516                 if (!(flags & TEVENT_FD_WRITE)) {
517                         state->finished = true;
518                         state->error = __location__;
519                         return;
520                 }
521         }
522
523         if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
524                 v = (uint8_t)cur_sock->num_written;
525                 ret = write(cur_sock->fd, &v, 1);
526                 if (ret != 1) {
527                         state->finished = true;
528                         state->error = __location__;
529                         return;
530                 }
531                 cur_sock->num_written++;
532                 if (cur_sock->num_written > 0x80000000) {
533                         state->finished = true;
534                         state->error = __location__;
535                         return;
536                 }
537                 return;
538         }
539
540         if (!cur_sock->got_full) {
541                 cur_sock->got_full = true;
542
543                 if (!oth_sock->got_full) {
544                         /*
545                          * cur_sock is full,
546                          * lets wait for oth_sock
547                          * to be filled
548                          */
549                         tevent_fd_set_flags(cur_sock->fde, 0);
550                         return;
551                 }
552
553                 /*
554                  * oth_sock waited for cur_sock,
555                  * lets restart it
556                  */
557                 tevent_fd_set_flags(oth_sock->fde,
558                                     TEVENT_FD_READ|TEVENT_FD_WRITE);
559         }
560
561         ret = read(cur_sock->fd, &v, 1);
562         if (ret != 1) {
563                 state->finished = true;
564                 state->error = __location__;
565                 return;
566         }
567         c = (uint8_t)cur_sock->num_read;
568         if (c != v) {
569                 state->finished = true;
570                 state->error = __location__;
571                 return;
572         }
573         cur_sock->num_read++;
574
575         if (cur_sock->num_read < oth_sock->num_written) {
576                 /* there is more to read */
577                 return;
578         }
579         /*
580          * we read everything, we need to remove TEVENT_FD_WRITE
581          * to avoid spinning
582          */
583         TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
584
585         if (oth_sock->num_read == cur_sock->num_written) {
586                 /*
587                  * both directions are finished
588                  */
589                 state->finished = true;
590         }
591
592         return;
593 }
594
595 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
596                                     struct tevent_timer *te,
597                                     struct timeval tval,
598                                     void *private_data)
599 {
600         struct test_event_fd2_state *state =
601                 (struct test_event_fd2_state *)private_data;
602
603         /*
604          * this should never be triggered
605          */
606         state->finished = true;
607         state->error = __location__;
608 }
609
610 static bool test_event_fd2(struct torture_context *tctx,
611                            const void *test_data)
612 {
613         struct test_event_fd2_state state;
614         int sock[2];
615         uint8_t c = 0;
616
617         ZERO_STRUCT(state);
618         state.tctx = tctx;
619         state.backend = (const char *)test_data;
620
621         state.ev = tevent_context_init_byname(tctx, state.backend);
622         if (state.ev == NULL) {
623                 torture_skip(tctx, talloc_asprintf(tctx,
624                              "event backend '%s' not supported\n",
625                              state.backend));
626                 return true;
627         }
628
629         tevent_set_debug_stderr(state.ev);
630         torture_comment(tctx, "backend '%s' - %s\n",
631                         state.backend, __FUNCTION__);
632
633         /*
634          * This tests the following
635          *
636          * - We write 1 byte to each socket
637          * - We wait for TEVENT_FD_READ/WRITE on both sockets
638          * - When we get TEVENT_FD_WRITE we write 1 byte
639          *   until both socket buffers are full, which
640          *   means both sockets only get TEVENT_FD_READ.
641          * - Then we read 1 byte until we have consumed
642          *   all bytes the other end has written.
643          */
644         sock[0] = -1;
645         sock[1] = -1;
646         socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
647
648         /*
649          * the timer should never expire
650          */
651         state.te = tevent_add_timer(state.ev, state.ev,
652                                     timeval_current_ofs(600, 0),
653                                     test_event_fd2_finished, &state);
654         state.sock0.state = &state;
655         state.sock0.fd = sock[0];
656         state.sock0.fde = tevent_add_fd(state.ev, state.ev,
657                                         state.sock0.fd,
658                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
659                                         test_event_fd2_sock_handler,
660                                         &state.sock0);
661         state.sock1.state = &state;
662         state.sock1.fd = sock[1];
663         state.sock1.fde = tevent_add_fd(state.ev, state.ev,
664                                         state.sock1.fd,
665                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
666                                         test_event_fd2_sock_handler,
667                                         &state.sock1);
668
669         tevent_fd_set_auto_close(state.sock0.fde);
670         tevent_fd_set_auto_close(state.sock1.fde);
671
672         do_write(state.sock0.fd, &c, 1);
673         state.sock0.num_written++;
674         do_write(state.sock1.fd, &c, 1);
675         state.sock1.num_written++;
676
677         while (!state.finished) {
678                 errno = 0;
679                 if (tevent_loop_once(state.ev) == -1) {
680                         talloc_free(state.ev);
681                         torture_fail(tctx, talloc_asprintf(tctx,
682                                      "Failed event loop %s\n",
683                                      strerror(errno)));
684                 }
685         }
686
687         talloc_free(state.ev);
688
689         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
690                        "%s", state.error));
691
692         return true;
693 }
694
695 #ifdef HAVE_PTHREAD
696
697 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
698 static bool do_shutdown = false;
699
700 static void test_event_threaded_lock(void)
701 {
702         int ret;
703         ret = pthread_mutex_lock(&threaded_mutex);
704         assert(ret == 0);
705 }
706
707 static void test_event_threaded_unlock(void)
708 {
709         int ret;
710         ret = pthread_mutex_unlock(&threaded_mutex);
711         assert(ret == 0);
712 }
713
714 static void test_event_threaded_trace(enum tevent_trace_point point,
715                                       void *private_data)
716 {
717         switch (point) {
718         case TEVENT_TRACE_BEFORE_WAIT:
719                 test_event_threaded_unlock();
720                 break;
721         case TEVENT_TRACE_AFTER_WAIT:
722                 test_event_threaded_lock();
723                 break;
724         case TEVENT_TRACE_BEFORE_LOOP_ONCE:
725         case TEVENT_TRACE_AFTER_LOOP_ONCE:
726                 break;
727         }
728 }
729
730 static void test_event_threaded_timer(struct tevent_context *ev,
731                                       struct tevent_timer *te,
732                                       struct timeval current_time,
733                                       void *private_data)
734 {
735         return;
736 }
737
738 static void *test_event_poll_thread(void *private_data)
739 {
740         struct tevent_context *ev = (struct tevent_context *)private_data;
741
742         test_event_threaded_lock();
743
744         while (true) {
745                 int ret;
746                 ret = tevent_loop_once(ev);
747                 assert(ret == 0);
748                 if (do_shutdown) {
749                         test_event_threaded_unlock();
750                         return NULL;
751                 }
752         }
753
754 }
755
756 static void test_event_threaded_read_handler(struct tevent_context *ev,
757                                              struct tevent_fd *fde,
758                                              uint16_t flags,
759                                              void *private_data)
760 {
761         int *pfd = (int *)private_data;
762         char c;
763         ssize_t nread;
764
765         if ((flags & TEVENT_FD_READ) == 0) {
766                 return;
767         }
768
769         do {
770                 nread = read(*pfd, &c, 1);
771         } while ((nread == -1) && (errno == EINTR));
772
773         assert(nread == 1);
774 }
775
776 static bool test_event_context_threaded(struct torture_context *test,
777                                         const void *test_data)
778 {
779         struct tevent_context *ev;
780         struct tevent_timer *te;
781         struct tevent_fd *fde;
782         pthread_t poll_thread;
783         int fds[2];
784         int ret;
785         char c = 0;
786
787         ev = tevent_context_init_byname(test, "poll_mt");
788         torture_assert(test, ev != NULL, "poll_mt not supported");
789
790         tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
791
792         te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
793                               test_event_threaded_timer, NULL);
794         torture_assert(test, te != NULL, "Could not add timer");
795
796         ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
797         torture_assert(test, ret == 0, "Could not create poll thread");
798
799         ret = pipe(fds);
800         torture_assert(test, ret == 0, "Could not create pipe");
801
802         poll(NULL, 0, 100);
803
804         test_event_threaded_lock();
805
806         fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
807                             test_event_threaded_read_handler, &fds[0]);
808         torture_assert(test, fde != NULL, "Could not add fd event");
809
810         test_event_threaded_unlock();
811
812         poll(NULL, 0, 100);
813
814         do_write(fds[1], &c, 1);
815
816         poll(NULL, 0, 100);
817
818         test_event_threaded_lock();
819         do_shutdown = true;
820         test_event_threaded_unlock();
821
822         do_write(fds[1], &c, 1);
823
824         ret = pthread_join(poll_thread, NULL);
825         torture_assert(test, ret == 0, "pthread_join failed");
826
827         return true;
828 }
829
830 #define NUM_TEVENT_THREADS 100
831
832 /* Ugly, but needed for torture_comment... */
833 static struct torture_context *thread_test_ctx;
834 static pthread_t thread_map[NUM_TEVENT_THREADS];
835 static unsigned thread_counter;
836
837 /* Called in master thread context */
838 static void callback_nowait(struct tevent_context *ev,
839                                 struct tevent_immediate *im,
840                                 void *private_ptr)
841 {
842         pthread_t *thread_id_ptr =
843                 talloc_get_type_abort(private_ptr, pthread_t);
844         unsigned i;
845
846         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
847                 if (pthread_equal(*thread_id_ptr,
848                                 thread_map[i])) {
849                         break;
850                 }
851         }
852         torture_comment(thread_test_ctx,
853                         "Callback %u from thread %u\n",
854                         thread_counter,
855                         i);
856         thread_counter++;
857 }
858
859 /* Blast the master tevent_context with a callback, no waiting. */
860 static void *thread_fn_nowait(void *private_ptr)
861 {
862         struct tevent_thread_proxy *master_tp =
863                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
864         struct tevent_immediate *im;
865         pthread_t *thread_id_ptr;
866
867         im = tevent_create_immediate(NULL);
868         if (im == NULL) {
869                 return NULL;
870         }
871         thread_id_ptr = talloc(NULL, pthread_t);
872         if (thread_id_ptr == NULL) {
873                 return NULL;
874         }
875         *thread_id_ptr = pthread_self();
876
877         tevent_thread_proxy_schedule(master_tp,
878                                 &im,
879                                 callback_nowait,
880                                 &thread_id_ptr);
881         return NULL;
882 }
883
884 static void timeout_fn(struct tevent_context *ev,
885                         struct tevent_timer *te,
886                         struct timeval tv, void *p)
887 {
888         thread_counter = NUM_TEVENT_THREADS * 10;
889 }
890
891 static bool test_multi_tevent_threaded(struct torture_context *test,
892                                         const void *test_data)
893 {
894         unsigned i;
895         struct tevent_context *master_ev;
896         struct tevent_thread_proxy *tp;
897
898         talloc_disable_null_tracking();
899
900         /* Ugly global stuff. */
901         thread_test_ctx = test;
902         thread_counter = 0;
903
904         master_ev = tevent_context_init(NULL);
905         if (master_ev == NULL) {
906                 return false;
907         }
908         tevent_set_debug_stderr(master_ev);
909
910         tp = tevent_thread_proxy_create(master_ev);
911         if (tp == NULL) {
912                 torture_fail(test,
913                         talloc_asprintf(test,
914                                 "tevent_thread_proxy_create failed\n"));
915                 talloc_free(master_ev);
916                 return false;
917         }
918
919         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
920                 int ret = pthread_create(&thread_map[i],
921                                 NULL,
922                                 thread_fn_nowait,
923                                 tp);
924                 if (ret != 0) {
925                         torture_fail(test,
926                                 talloc_asprintf(test,
927                                         "Failed to create thread %i, %d\n",
928                                         i, ret));
929                         return false;
930                 }
931         }
932
933         /* Ensure we don't wait more than 10 seconds. */
934         tevent_add_timer(master_ev,
935                         master_ev,
936                         timeval_current_ofs(10,0),
937                         timeout_fn,
938                         NULL);
939
940         while (thread_counter < NUM_TEVENT_THREADS) {
941                 int ret = tevent_loop_once(master_ev);
942                 torture_assert(test, ret == 0, "tevent_loop_once failed");
943         }
944
945         torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
946                 "thread_counter fail\n");
947
948         talloc_free(master_ev);
949         return true;
950 }
951
952 struct reply_state {
953         struct tevent_thread_proxy *reply_tp;
954         pthread_t thread_id;
955         int *p_finished;
956 };
957
958 static void thread_timeout_fn(struct tevent_context *ev,
959                         struct tevent_timer *te,
960                         struct timeval tv, void *p)
961 {
962         int *p_finished = (int *)p;
963
964         *p_finished = 2;
965 }
966
967 /* Called in child-thread context */
968 static void thread_callback(struct tevent_context *ev,
969                                 struct tevent_immediate *im,
970                                 void *private_ptr)
971 {
972         struct reply_state *rsp =
973                 talloc_get_type_abort(private_ptr, struct reply_state);
974
975         talloc_steal(ev, rsp);
976         *rsp->p_finished = 1;
977 }
978
979 /* Called in master thread context */
980 static void master_callback(struct tevent_context *ev,
981                                 struct tevent_immediate *im,
982                                 void *private_ptr)
983 {
984         struct reply_state *rsp =
985                 talloc_get_type_abort(private_ptr, struct reply_state);
986         unsigned i;
987
988         talloc_steal(ev, rsp);
989
990         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
991                 if (pthread_equal(rsp->thread_id,
992                                 thread_map[i])) {
993                         break;
994                 }
995         }
996         torture_comment(thread_test_ctx,
997                         "Callback %u from thread %u\n",
998                         thread_counter,
999                         i);
1000         /* Now reply to the thread ! */
1001         tevent_thread_proxy_schedule(rsp->reply_tp,
1002                                 &im,
1003                                 thread_callback,
1004                                 &rsp);
1005
1006         thread_counter++;
1007 }
1008
1009 static void *thread_fn_1(void *private_ptr)
1010 {
1011         struct tevent_thread_proxy *master_tp =
1012                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1013         struct tevent_thread_proxy *tp;
1014         struct tevent_immediate *im;
1015         struct tevent_context *ev;
1016         struct reply_state *rsp;
1017         int finished = 0;
1018         int ret;
1019
1020         ev = tevent_context_init(NULL);
1021         if (ev == NULL) {
1022                 return NULL;
1023         }
1024
1025         tp = tevent_thread_proxy_create(ev);
1026         if (tp == NULL) {
1027                 talloc_free(ev);
1028                 return NULL;
1029         }
1030
1031         im = tevent_create_immediate(ev);
1032         if (im == NULL) {
1033                 talloc_free(ev);
1034                 return NULL;
1035         }
1036
1037         rsp = talloc(ev, struct reply_state);
1038         if (rsp == NULL) {
1039                 talloc_free(ev);
1040                 return NULL;
1041         }
1042
1043         rsp->thread_id = pthread_self();
1044         rsp->reply_tp = tp;
1045         rsp->p_finished = &finished;
1046
1047         /* Introduce a little randomness into the mix.. */
1048         usleep(random() % 7000);
1049
1050         tevent_thread_proxy_schedule(master_tp,
1051                                 &im,
1052                                 master_callback,
1053                                 &rsp);
1054
1055         /* Ensure we don't wait more than 10 seconds. */
1056         tevent_add_timer(ev,
1057                         ev,
1058                         timeval_current_ofs(10,0),
1059                         thread_timeout_fn,
1060                         &finished);
1061
1062         while (finished == 0) {
1063                 ret = tevent_loop_once(ev);
1064                 assert(ret == 0);
1065         }
1066
1067         if (finished > 1) {
1068                 /* Timeout ! */
1069                 abort();
1070         }
1071
1072         /*
1073          * NB. We should talloc_free(ev) here, but if we do
1074          * we currently get hit by helgrind Fix #323432
1075          * "When calling pthread_cond_destroy or pthread_mutex_destroy
1076          * with initializers as argument Helgrind (incorrectly) reports errors."
1077          *
1078          * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1079          * with-pthread-mutex-destroy-td47757.html
1080          *
1081          * Helgrind doesn't understand that the request/reply
1082          * messages provide synchronization between the lock/unlock
1083          * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1084          * when the struct tevent_thread_proxy object is talloc_free'd.
1085          *
1086          * As a work-around for now return ev for the parent thread to free.
1087          */
1088         return ev;
1089 }
1090
1091 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1092                                         const void *test_data)
1093 {
1094         unsigned i;
1095         struct tevent_context *master_ev;
1096         struct tevent_thread_proxy *master_tp;
1097         int ret;
1098
1099         talloc_disable_null_tracking();
1100
1101         /* Ugly global stuff. */
1102         thread_test_ctx = test;
1103         thread_counter = 0;
1104
1105         master_ev = tevent_context_init(NULL);
1106         if (master_ev == NULL) {
1107                 return false;
1108         }
1109         tevent_set_debug_stderr(master_ev);
1110
1111         master_tp = tevent_thread_proxy_create(master_ev);
1112         if (master_tp == NULL) {
1113                 torture_fail(test,
1114                         talloc_asprintf(test,
1115                                 "tevent_thread_proxy_create failed\n"));
1116                 talloc_free(master_ev);
1117                 return false;
1118         }
1119
1120         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1121                 ret = pthread_create(&thread_map[i],
1122                                 NULL,
1123                                 thread_fn_1,
1124                                 master_tp);
1125                 if (ret != 0) {
1126                         torture_fail(test,
1127                                 talloc_asprintf(test,
1128                                         "Failed to create thread %i, %d\n",
1129                                         i, ret));
1130                                 return false;
1131                 }
1132         }
1133
1134         while (thread_counter < NUM_TEVENT_THREADS) {
1135                 ret = tevent_loop_once(master_ev);
1136                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1137         }
1138
1139         /* Wait for all the threads to finish - join 'em. */
1140         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1141                 void *retval;
1142                 ret = pthread_join(thread_map[i], &retval);
1143                 torture_assert(test, ret == 0, "pthread_join failed");
1144                 /* Free the child thread event context. */
1145                 talloc_free(retval);
1146         }
1147
1148         talloc_free(master_ev);
1149         return true;
1150 }
1151
1152 struct threaded_test_2 {
1153         struct tevent_threaded_context *tctx;
1154         struct tevent_immediate *im;
1155         pthread_t thread_id;
1156 };
1157
1158 static void master_callback_2(struct tevent_context *ev,
1159                               struct tevent_immediate *im,
1160                               void *private_data);
1161
1162 static void *thread_fn_2(void *private_data)
1163 {
1164         struct threaded_test_2 *state = private_data;
1165
1166         state->thread_id = pthread_self();
1167
1168         usleep(random() % 7000);
1169
1170         tevent_threaded_schedule_immediate(
1171                 state->tctx, state->im, master_callback_2, state);
1172
1173         return NULL;
1174 }
1175
1176 static void master_callback_2(struct tevent_context *ev,
1177                               struct tevent_immediate *im,
1178                               void *private_data)
1179 {
1180         struct threaded_test_2 *state = private_data;
1181         int i;
1182
1183         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1184                 if (pthread_equal(state->thread_id, thread_map[i])) {
1185                         break;
1186                 }
1187         }
1188         torture_comment(thread_test_ctx,
1189                         "Callback_2 %u from thread %u\n",
1190                         thread_counter,
1191                         i);
1192         thread_counter++;
1193 }
1194
1195 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1196                                          const void *test_data)
1197 {
1198         unsigned i;
1199
1200         struct tevent_context *ev;
1201         struct tevent_threaded_context *tctx;
1202         int ret;
1203
1204         thread_test_ctx = test;
1205         thread_counter = 0;
1206
1207         ev = tevent_context_init(test);
1208         torture_assert(test, ev != NULL, "tevent_context_init failed");
1209
1210         tctx = tevent_threaded_context_create(ev, ev);
1211         torture_assert(test, tctx != NULL,
1212                        "tevent_threaded_context_create failed");
1213
1214         for (i=0; i<NUM_TEVENT_THREADS; i++) {
1215                 struct threaded_test_2 *state;
1216
1217                 state = talloc(ev, struct threaded_test_2);
1218                 torture_assert(test, state != NULL, "talloc failed");
1219
1220                 state->tctx = tctx;
1221                 state->im = tevent_create_immediate(state);
1222                 torture_assert(test, state->im != NULL,
1223                                "tevent_create_immediate failed");
1224
1225                 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1226                 torture_assert(test, ret == 0, "pthread_create failed");
1227         }
1228
1229         while (thread_counter < NUM_TEVENT_THREADS) {
1230                 ret = tevent_loop_once(ev);
1231                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1232         }
1233
1234         /* Wait for all the threads to finish - join 'em. */
1235         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1236                 void *retval;
1237                 ret = pthread_join(thread_map[i], &retval);
1238                 torture_assert(test, ret == 0, "pthread_join failed");
1239                 /* Free the child thread event context. */
1240         }
1241
1242         talloc_free(tctx);
1243         talloc_free(ev);
1244         return true;
1245 }
1246 #endif
1247
1248 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1249 {
1250         struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1251         const char **list = tevent_backend_list(suite);
1252         int i;
1253
1254         for (i=0;list && list[i];i++) {
1255                 struct torture_suite *backend_suite;
1256
1257                 backend_suite = torture_suite_create(mem_ctx, list[i]);
1258
1259                 torture_suite_add_simple_tcase_const(backend_suite,
1260                                                "context",
1261                                                test_event_context,
1262                                                (const void *)list[i]);
1263                 torture_suite_add_simple_tcase_const(backend_suite,
1264                                                "fd1",
1265                                                test_event_fd1,
1266                                                (const void *)list[i]);
1267                 torture_suite_add_simple_tcase_const(backend_suite,
1268                                                "fd2",
1269                                                test_event_fd2,
1270                                                (const void *)list[i]);
1271
1272                 torture_suite_add_suite(suite, backend_suite);
1273         }
1274
1275 #ifdef HAVE_PTHREAD
1276         torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1277                                              test_event_context_threaded,
1278                                              NULL);
1279
1280         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1281                                              test_multi_tevent_threaded,
1282                                              NULL);
1283
1284         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1285                                              test_multi_tevent_threaded_1,
1286                                              NULL);
1287
1288         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1289                                              test_multi_tevent_threaded_2,
1290                                              NULL);
1291
1292 #endif
1293
1294         return suite;
1295 }