test_event_fd1 error only on read
[metze/samba/wip.git] / lib / tevent / testsuite.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    testing of the events subsystem
5
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #include "lib/tevent/tevent.h"
29 #include "system/filesys.h"
30 #include "system/select.h"
31 #include "system/network.h"
32 #include "torture/torture.h"
33 #include "torture/local/proto.h"
34 #ifdef HAVE_PTHREAD
35 #include <pthread.h>
36 #include <assert.h>
37 #endif
38
39 static int fde_count;
40
41 static void do_read(int fd, void *buf, size_t count)
42 {
43         ssize_t ret;
44
45         do {
46                 ret = read(fd, buf, count);
47         } while (ret == -1 && errno == EINTR);
48 }
49
50 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
51                         uint16_t flags, void *private_data)
52 {
53         int *fd = (int *)private_data;
54         char c;
55 #ifdef SA_SIGINFO
56         kill(getpid(), SIGUSR1);
57 #endif
58         kill(getpid(), SIGALRM);
59
60         do_read(fd[0], &c, 1);
61         fde_count++;
62 }
63
64 static void do_write(int fd, void *buf, size_t count)
65 {
66         ssize_t ret;
67
68         do {
69                 ret = write(fd, buf, count);
70         } while (ret == -1 && errno == EINTR);
71 }
72
73 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
74                         uint16_t flags, void *private_data)
75 {
76         int *fd = (int *)private_data;
77         char c = 0;
78
79         do_write(fd[1], &c, 1);
80 }
81
82
83 /* This will only fire if the fd's returned from pipe() are bi-directional. */
84 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
85                         uint16_t flags, void *private_data)
86 {
87         int *fd = (int *)private_data;
88         char c;
89 #ifdef SA_SIGINFO
90         kill(getpid(), SIGUSR1);
91 #endif
92         kill(getpid(), SIGALRM);
93
94         do_read(fd[1], &c, 1);
95         fde_count++;
96 }
97
98 /* This will only fire if the fd's returned from pipe() are bi-directional. */
99 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
100                         uint16_t flags, void *private_data)
101 {
102         int *fd = (int *)private_data;
103         char c = 0;
104         do_write(fd[0], &c, 1);
105 }
106
107 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
108                              struct timeval tval, void *private_data)
109 {
110         int *finished = (int *)private_data;
111         (*finished) = 1;
112 }
113
114 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
115                           int signum, int count, void *info, void *private_data)
116 {
117         int *countp = (int *)private_data;
118         (*countp) += count;
119 }
120
121 static bool test_event_context(struct torture_context *test,
122                                const void *test_data)
123 {
124         struct tevent_context *ev_ctx;
125         int fd[2] = { -1, -1 };
126         const char *backend = (const char *)test_data;
127         int alarm_count=0, info_count=0;
128         struct tevent_fd *fde_read;
129         struct tevent_fd *fde_read_1;
130         struct tevent_fd *fde_write;
131         struct tevent_fd *fde_write_1;
132 #ifdef SA_RESTART
133         struct tevent_signal *se1 = NULL;
134 #endif
135 #ifdef SA_RESETHAND
136         struct tevent_signal *se2 = NULL;
137 #endif
138 #ifdef SA_SIGINFO
139         struct tevent_signal *se3 = NULL;
140 #endif
141         int finished=0;
142         struct timeval t;
143         int ret;
144
145         ev_ctx = tevent_context_init_byname(test, backend);
146         if (ev_ctx == NULL) {
147                 torture_comment(test, "event backend '%s' not supported\n", backend);
148                 return true;
149         }
150
151         torture_comment(test, "backend '%s' - %s\n",
152                         backend, __FUNCTION__);
153
154         /* reset globals */
155         fde_count = 0;
156
157         /* create a pipe */
158         ret = pipe(fd);
159         torture_assert_int_equal(test, ret, 0, "pipe failed");
160
161         fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
162                             fde_handler_read, fd);
163         fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
164                             fde_handler_write_1, fd);
165
166         fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
167                             fde_handler_write, fd);
168         fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
169                             fde_handler_read_1, fd);
170
171         tevent_fd_set_auto_close(fde_read);
172         tevent_fd_set_auto_close(fde_write);
173
174         tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
175                          finished_handler, &finished);
176
177 #ifdef SA_RESTART
178         se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
179         torture_assert(test, se1 != NULL, "failed to setup se1");
180 #endif
181 #ifdef SA_RESETHAND
182         se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
183         torture_assert(test, se2 != NULL, "failed to setup se2");
184 #endif
185 #ifdef SA_SIGINFO
186         se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
187         torture_assert(test, se3 != NULL, "failed to setup se3");
188 #endif
189
190         t = timeval_current();
191         while (!finished) {
192                 errno = 0;
193                 if (tevent_loop_once(ev_ctx) == -1) {
194                         talloc_free(ev_ctx);
195                         torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
196                 }
197         }
198
199         talloc_free(fde_read_1);
200         talloc_free(fde_write_1);
201         talloc_free(fde_read);
202         talloc_free(fde_write);
203
204         while (alarm_count < fde_count+1) {
205                 if (tevent_loop_once(ev_ctx) == -1) {
206                         break;
207                 }
208         }
209
210         torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
211
212 #ifdef SA_RESTART
213         talloc_free(se1);
214 #endif
215
216         torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
217
218 #ifdef SA_RESETHAND
219         /*
220          * we do not call talloc_free(se2)
221          * because it is already gone,
222          * after triggering the event handler.
223          */
224 #endif
225
226 #ifdef SA_SIGINFO
227         talloc_free(se3);
228         torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
229 #endif
230
231         talloc_free(ev_ctx);
232
233         return true;
234 }
235
236 struct test_event_fd1_state {
237         struct torture_context *tctx;
238         const char *backend;
239         struct tevent_context *ev;
240         int sock[2];
241         struct tevent_timer *te;
242         struct tevent_fd *fde0;
243         struct tevent_fd *fde1;
244         bool got_write;
245         bool got_read;
246         bool drain;
247         bool drain_done;
248         unsigned loop_count;
249         bool finished;
250         const char *error;
251 };
252
253 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
254                                        struct tevent_fd *fde,
255                                        uint16_t flags,
256                                        void *private_data)
257 {
258         struct test_event_fd1_state *state =
259                 (struct test_event_fd1_state *)private_data;
260
261         if (state->drain_done) {
262                 state->finished = true;
263                 state->error = __location__;
264                 return;
265         }
266
267         if (state->drain) {
268                 ssize_t ret;
269                 uint8_t c = 0;
270
271                 if (!(flags & TEVENT_FD_READ)) {
272                         state->finished = true;
273                         state->error = __location__;
274                         return;
275                 }
276
277                 ret = read(state->sock[0], &c, 1);
278                 if (ret == 1) {
279                         if (flags != (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
280                                 state->finished = true;
281                                 state->error = __location__;
282                                 return;
283                         }
284                         return;
285                 }
286
287                 /*
288                  * EOF and errors should be just reported
289                  * by TEVENT_FD_READ.
290                  */
291                 if (flags != TEVENT_FD_READ) {
292                         state->finished = true;
293                         state->error = __location__;
294                         return;
295                 }
296
297                 /*
298                  * end of test...
299                  */
300                 TEVENT_FD_NOT_READABLE(fde);
301                 state->drain_done = true;
302                 return;
303         }
304
305         if (!state->got_write) {
306                 uint8_t c = 0;
307
308                 if (flags != TEVENT_FD_WRITE) {
309                         state->finished = true;
310                         state->error = __location__;
311                         return;
312                 }
313                 state->got_write = true;
314
315                 /*
316                  * we write to the other socket...
317                  */
318                 do_write(state->sock[1], &c, 1);
319                 TEVENT_FD_NOT_WRITEABLE(fde);
320                 TEVENT_FD_READABLE(fde);
321                 return;
322         }
323
324         if (!state->got_read) {
325                 if (flags != TEVENT_FD_READ) {
326                         state->finished = true;
327                         state->error = __location__;
328                         return;
329                 }
330                 state->got_read = true;
331
332                 TEVENT_FD_NOT_READABLE(fde);
333                 return;
334         }
335
336         state->finished = true;
337         state->error = __location__;
338         return;
339 }
340
341 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
342                                     struct tevent_timer *te,
343                                     struct timeval tval,
344                                     void *private_data)
345 {
346         struct test_event_fd1_state *state =
347                 (struct test_event_fd1_state *)private_data;
348
349         if (state->drain_done) {
350                 state->finished = true;
351                 return;
352         }
353
354         if (!state->got_write) {
355                 state->finished = true;
356                 state->error = __location__;
357                 return;
358         }
359
360         if (!state->got_read) {
361                 state->finished = true;
362                 state->error = __location__;
363                 return;
364         }
365
366         state->loop_count++;
367         if (state->loop_count > 3) {
368                 state->finished = true;
369                 state->error = __location__;
370                 return;
371         }
372
373         state->got_write = false;
374         state->got_read = false;
375
376         tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
377
378         if (state->loop_count > 2) {
379                 state->drain = true;
380                 TALLOC_FREE(state->fde1);
381                 TEVENT_FD_READABLE(state->fde0);
382         }
383
384         state->te = tevent_add_timer(state->ev, state->ev,
385                                     timeval_current_ofs(0,2000),
386                                     test_event_fd1_finished, state);
387 }
388
389 static bool test_event_fd1(struct torture_context *tctx,
390                            const void *test_data)
391 {
392         struct test_event_fd1_state state;
393
394         ZERO_STRUCT(state);
395         state.tctx = tctx;
396         state.backend = (const char *)test_data;
397
398         state.ev = tevent_context_init_byname(tctx, state.backend);
399         if (state.ev == NULL) {
400                 torture_skip(tctx, talloc_asprintf(tctx,
401                              "event backend '%s' not supported\n",
402                              state.backend));
403                 return true;
404         }
405
406         tevent_set_debug_stderr(state.ev);
407         torture_comment(tctx, "backend '%s' - %s\n",
408                         state.backend, __FUNCTION__);
409
410         /*
411          * This tests the following:
412          *
413          * It monitors the state of state.sock[0]
414          * with tevent_fd, but we never read/write on state.sock[0]
415          * while state.sock[1] * is only used to write a few bytes.
416          *
417          * We have a loop:
418          *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
419          *   - we write 1 byte to state.sock[1]
420          *   - we wait only for TEVENT_FD_READ on state.sock[0]
421          *   - we disable events on state.sock[0]
422          *   - the timer event restarts the loop
423          * Then we close state.sock[1]
424          * We have a loop:
425          *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
426          *   - we try to read 1 byte
427          *   - if the read gets an error of returns 0
428          *     we disable the event handler
429          *   - the timer finishes the test
430          */
431         state.sock[0] = -1;
432         state.sock[1] = -1;
433         socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
434
435         state.te = tevent_add_timer(state.ev, state.ev,
436                                     timeval_current_ofs(0,1000),
437                                     test_event_fd1_finished, &state);
438         state.fde0 = tevent_add_fd(state.ev, state.ev,
439                                    state.sock[0], TEVENT_FD_WRITE,
440                                    test_event_fd1_fde_handler, &state);
441         /* state.fde1 is only used to auto close */
442         state.fde1 = tevent_add_fd(state.ev, state.ev,
443                                    state.sock[1], 0,
444                                    test_event_fd1_fde_handler, &state);
445
446         tevent_fd_set_auto_close(state.fde0);
447         tevent_fd_set_auto_close(state.fde1);
448
449         while (!state.finished) {
450                 errno = 0;
451                 if (tevent_loop_once(state.ev) == -1) {
452                         talloc_free(state.ev);
453                         torture_fail(tctx, talloc_asprintf(tctx,
454                                      "Failed event loop %s\n",
455                                      strerror(errno)));
456                 }
457         }
458
459         talloc_free(state.ev);
460
461         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
462                        "%s", state.error));
463
464         return true;
465 }
466
467 struct test_event_fd2_state {
468         struct torture_context *tctx;
469         const char *backend;
470         struct tevent_context *ev;
471         struct tevent_timer *te;
472         struct test_event_fd2_sock {
473                 struct test_event_fd2_state *state;
474                 int fd;
475                 struct tevent_fd *fde;
476                 size_t num_written;
477                 size_t num_read;
478                 bool got_full;
479         } sock0, sock1;
480         bool finished;
481         const char *error;
482 };
483
484 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
485                                         struct tevent_fd *fde,
486                                         uint16_t flags,
487                                         void *private_data)
488 {
489         struct test_event_fd2_sock *cur_sock =
490                 (struct test_event_fd2_sock *)private_data;
491         struct test_event_fd2_state *state = cur_sock->state;
492         struct test_event_fd2_sock *oth_sock = NULL;
493         uint8_t v = 0, c;
494         ssize_t ret;
495
496         if (cur_sock == &state->sock0) {
497                 oth_sock = &state->sock1;
498         } else {
499                 oth_sock = &state->sock0;
500         }
501
502         if (oth_sock->num_written == 1) {
503                 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
504                         state->finished = true;
505                         state->error = __location__;
506                         return;
507                 }
508         }
509
510         if (cur_sock->num_read == oth_sock->num_written) {
511                 state->finished = true;
512                 state->error = __location__;
513                 return;
514         }
515
516         if (!(flags & TEVENT_FD_READ)) {
517                 state->finished = true;
518                 state->error = __location__;
519                 return;
520         }
521
522         if (oth_sock->num_read >= PIPE_BUF) {
523                 /*
524                  * On Linux we become writable once we've read
525                  * one byte. On Solaris we only become writable
526                  * again once we've read 4096 bytes. PIPE_BUF
527                  * is probably a safe bet to test against.
528                  *
529                  * There should be room to write a byte again
530                  */
531                 if (!(flags & TEVENT_FD_WRITE)) {
532                         state->finished = true;
533                         state->error = __location__;
534                         return;
535                 }
536         }
537
538         if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
539                 v = (uint8_t)cur_sock->num_written;
540                 ret = write(cur_sock->fd, &v, 1);
541                 if (ret != 1) {
542                         state->finished = true;
543                         state->error = __location__;
544                         return;
545                 }
546                 cur_sock->num_written++;
547                 if (cur_sock->num_written > 0x80000000) {
548                         state->finished = true;
549                         state->error = __location__;
550                         return;
551                 }
552                 return;
553         }
554
555         if (!cur_sock->got_full) {
556                 cur_sock->got_full = true;
557
558                 if (!oth_sock->got_full) {
559                         /*
560                          * cur_sock is full,
561                          * lets wait for oth_sock
562                          * to be filled
563                          */
564                         tevent_fd_set_flags(cur_sock->fde, 0);
565                         return;
566                 }
567
568                 /*
569                  * oth_sock waited for cur_sock,
570                  * lets restart it
571                  */
572                 tevent_fd_set_flags(oth_sock->fde,
573                                     TEVENT_FD_READ|TEVENT_FD_WRITE);
574         }
575
576         ret = read(cur_sock->fd, &v, 1);
577         if (ret != 1) {
578                 state->finished = true;
579                 state->error = __location__;
580                 return;
581         }
582         c = (uint8_t)cur_sock->num_read;
583         if (c != v) {
584                 state->finished = true;
585                 state->error = __location__;
586                 return;
587         }
588         cur_sock->num_read++;
589
590         if (cur_sock->num_read < oth_sock->num_written) {
591                 /* there is more to read */
592                 return;
593         }
594         /*
595          * we read everything, we need to remove TEVENT_FD_WRITE
596          * to avoid spinning
597          */
598         TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
599
600         if (oth_sock->num_read == cur_sock->num_written) {
601                 /*
602                  * both directions are finished
603                  */
604                 state->finished = true;
605         }
606
607         return;
608 }
609
610 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
611                                     struct tevent_timer *te,
612                                     struct timeval tval,
613                                     void *private_data)
614 {
615         struct test_event_fd2_state *state =
616                 (struct test_event_fd2_state *)private_data;
617
618         /*
619          * this should never be triggered
620          */
621         state->finished = true;
622         state->error = __location__;
623 }
624
625 static bool test_event_fd2(struct torture_context *tctx,
626                            const void *test_data)
627 {
628         struct test_event_fd2_state state;
629         int sock[2];
630         uint8_t c = 0;
631
632         ZERO_STRUCT(state);
633         state.tctx = tctx;
634         state.backend = (const char *)test_data;
635
636         state.ev = tevent_context_init_byname(tctx, state.backend);
637         if (state.ev == NULL) {
638                 torture_skip(tctx, talloc_asprintf(tctx,
639                              "event backend '%s' not supported\n",
640                              state.backend));
641                 return true;
642         }
643
644         tevent_set_debug_stderr(state.ev);
645         torture_comment(tctx, "backend '%s' - %s\n",
646                         state.backend, __FUNCTION__);
647
648         /*
649          * This tests the following
650          *
651          * - We write 1 byte to each socket
652          * - We wait for TEVENT_FD_READ/WRITE on both sockets
653          * - When we get TEVENT_FD_WRITE we write 1 byte
654          *   until both socket buffers are full, which
655          *   means both sockets only get TEVENT_FD_READ.
656          * - Then we read 1 byte until we have consumed
657          *   all bytes the other end has written.
658          */
659         sock[0] = -1;
660         sock[1] = -1;
661         socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
662
663         /*
664          * the timer should never expire
665          */
666         state.te = tevent_add_timer(state.ev, state.ev,
667                                     timeval_current_ofs(600, 0),
668                                     test_event_fd2_finished, &state);
669         state.sock0.state = &state;
670         state.sock0.fd = sock[0];
671         state.sock0.fde = tevent_add_fd(state.ev, state.ev,
672                                         state.sock0.fd,
673                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
674                                         test_event_fd2_sock_handler,
675                                         &state.sock0);
676         state.sock1.state = &state;
677         state.sock1.fd = sock[1];
678         state.sock1.fde = tevent_add_fd(state.ev, state.ev,
679                                         state.sock1.fd,
680                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
681                                         test_event_fd2_sock_handler,
682                                         &state.sock1);
683
684         tevent_fd_set_auto_close(state.sock0.fde);
685         tevent_fd_set_auto_close(state.sock1.fde);
686
687         do_write(state.sock0.fd, &c, 1);
688         state.sock0.num_written++;
689         do_write(state.sock1.fd, &c, 1);
690         state.sock1.num_written++;
691
692         while (!state.finished) {
693                 errno = 0;
694                 if (tevent_loop_once(state.ev) == -1) {
695                         talloc_free(state.ev);
696                         torture_fail(tctx, talloc_asprintf(tctx,
697                                      "Failed event loop %s\n",
698                                      strerror(errno)));
699                 }
700         }
701
702         talloc_free(state.ev);
703
704         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
705                        "%s", state.error));
706
707         return true;
708 }
709
710 #ifdef HAVE_PTHREAD
711
712 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
713 static bool do_shutdown = false;
714
715 static void test_event_threaded_lock(void)
716 {
717         int ret;
718         ret = pthread_mutex_lock(&threaded_mutex);
719         assert(ret == 0);
720 }
721
722 static void test_event_threaded_unlock(void)
723 {
724         int ret;
725         ret = pthread_mutex_unlock(&threaded_mutex);
726         assert(ret == 0);
727 }
728
729 static void test_event_threaded_trace(enum tevent_trace_point point,
730                                       void *private_data)
731 {
732         switch (point) {
733         case TEVENT_TRACE_BEFORE_WAIT:
734                 test_event_threaded_unlock();
735                 break;
736         case TEVENT_TRACE_AFTER_WAIT:
737                 test_event_threaded_lock();
738                 break;
739         case TEVENT_TRACE_BEFORE_LOOP_ONCE:
740         case TEVENT_TRACE_AFTER_LOOP_ONCE:
741                 break;
742         }
743 }
744
745 static void test_event_threaded_timer(struct tevent_context *ev,
746                                       struct tevent_timer *te,
747                                       struct timeval current_time,
748                                       void *private_data)
749 {
750         return;
751 }
752
753 static void *test_event_poll_thread(void *private_data)
754 {
755         struct tevent_context *ev = (struct tevent_context *)private_data;
756
757         test_event_threaded_lock();
758
759         while (true) {
760                 int ret;
761                 ret = tevent_loop_once(ev);
762                 assert(ret == 0);
763                 if (do_shutdown) {
764                         test_event_threaded_unlock();
765                         return NULL;
766                 }
767         }
768
769 }
770
771 static void test_event_threaded_read_handler(struct tevent_context *ev,
772                                              struct tevent_fd *fde,
773                                              uint16_t flags,
774                                              void *private_data)
775 {
776         int *pfd = (int *)private_data;
777         char c;
778         ssize_t nread;
779
780         if ((flags & TEVENT_FD_READ) == 0) {
781                 return;
782         }
783
784         do {
785                 nread = read(*pfd, &c, 1);
786         } while ((nread == -1) && (errno == EINTR));
787
788         assert(nread == 1);
789 }
790
791 static bool test_event_context_threaded(struct torture_context *test,
792                                         const void *test_data)
793 {
794         struct tevent_context *ev;
795         struct tevent_timer *te;
796         struct tevent_fd *fde;
797         pthread_t poll_thread;
798         int fds[2];
799         int ret;
800         char c = 0;
801
802         ev = tevent_context_init_byname(test, "poll_mt");
803         torture_assert(test, ev != NULL, "poll_mt not supported");
804
805         tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
806
807         te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
808                               test_event_threaded_timer, NULL);
809         torture_assert(test, te != NULL, "Could not add timer");
810
811         ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
812         torture_assert(test, ret == 0, "Could not create poll thread");
813
814         ret = pipe(fds);
815         torture_assert(test, ret == 0, "Could not create pipe");
816
817         poll(NULL, 0, 100);
818
819         test_event_threaded_lock();
820
821         fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
822                             test_event_threaded_read_handler, &fds[0]);
823         torture_assert(test, fde != NULL, "Could not add fd event");
824
825         test_event_threaded_unlock();
826
827         poll(NULL, 0, 100);
828
829         do_write(fds[1], &c, 1);
830
831         poll(NULL, 0, 100);
832
833         test_event_threaded_lock();
834         do_shutdown = true;
835         test_event_threaded_unlock();
836
837         do_write(fds[1], &c, 1);
838
839         ret = pthread_join(poll_thread, NULL);
840         torture_assert(test, ret == 0, "pthread_join failed");
841
842         return true;
843 }
844
845 #define NUM_TEVENT_THREADS 100
846
847 /* Ugly, but needed for torture_comment... */
848 static struct torture_context *thread_test_ctx;
849 static pthread_t thread_map[NUM_TEVENT_THREADS];
850 static unsigned thread_counter;
851
852 /* Called in master thread context */
853 static void callback_nowait(struct tevent_context *ev,
854                                 struct tevent_immediate *im,
855                                 void *private_ptr)
856 {
857         pthread_t *thread_id_ptr =
858                 talloc_get_type_abort(private_ptr, pthread_t);
859         unsigned i;
860
861         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
862                 if (pthread_equal(*thread_id_ptr,
863                                 thread_map[i])) {
864                         break;
865                 }
866         }
867         torture_comment(thread_test_ctx,
868                         "Callback %u from thread %u\n",
869                         thread_counter,
870                         i);
871         thread_counter++;
872 }
873
874 /* Blast the master tevent_context with a callback, no waiting. */
875 static void *thread_fn_nowait(void *private_ptr)
876 {
877         struct tevent_thread_proxy *master_tp =
878                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
879         struct tevent_immediate *im;
880         pthread_t *thread_id_ptr;
881
882         im = tevent_create_immediate(NULL);
883         if (im == NULL) {
884                 return NULL;
885         }
886         thread_id_ptr = talloc(NULL, pthread_t);
887         if (thread_id_ptr == NULL) {
888                 return NULL;
889         }
890         *thread_id_ptr = pthread_self();
891
892         tevent_thread_proxy_schedule(master_tp,
893                                 &im,
894                                 callback_nowait,
895                                 &thread_id_ptr);
896         return NULL;
897 }
898
899 static void timeout_fn(struct tevent_context *ev,
900                         struct tevent_timer *te,
901                         struct timeval tv, void *p)
902 {
903         thread_counter = NUM_TEVENT_THREADS * 10;
904 }
905
906 static bool test_multi_tevent_threaded(struct torture_context *test,
907                                         const void *test_data)
908 {
909         unsigned i;
910         struct tevent_context *master_ev;
911         struct tevent_thread_proxy *tp;
912
913         talloc_disable_null_tracking();
914
915         /* Ugly global stuff. */
916         thread_test_ctx = test;
917         thread_counter = 0;
918
919         master_ev = tevent_context_init(NULL);
920         if (master_ev == NULL) {
921                 return false;
922         }
923         tevent_set_debug_stderr(master_ev);
924
925         tp = tevent_thread_proxy_create(master_ev);
926         if (tp == NULL) {
927                 torture_fail(test,
928                         talloc_asprintf(test,
929                                 "tevent_thread_proxy_create failed\n"));
930                 talloc_free(master_ev);
931                 return false;
932         }
933
934         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
935                 int ret = pthread_create(&thread_map[i],
936                                 NULL,
937                                 thread_fn_nowait,
938                                 tp);
939                 if (ret != 0) {
940                         torture_fail(test,
941                                 talloc_asprintf(test,
942                                         "Failed to create thread %i, %d\n",
943                                         i, ret));
944                         return false;
945                 }
946         }
947
948         /* Ensure we don't wait more than 10 seconds. */
949         tevent_add_timer(master_ev,
950                         master_ev,
951                         timeval_current_ofs(10,0),
952                         timeout_fn,
953                         NULL);
954
955         while (thread_counter < NUM_TEVENT_THREADS) {
956                 int ret = tevent_loop_once(master_ev);
957                 torture_assert(test, ret == 0, "tevent_loop_once failed");
958         }
959
960         torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
961                 "thread_counter fail\n");
962
963         talloc_free(master_ev);
964         return true;
965 }
966
967 struct reply_state {
968         struct tevent_thread_proxy *reply_tp;
969         pthread_t thread_id;
970         int *p_finished;
971 };
972
973 static void thread_timeout_fn(struct tevent_context *ev,
974                         struct tevent_timer *te,
975                         struct timeval tv, void *p)
976 {
977         int *p_finished = (int *)p;
978
979         *p_finished = 2;
980 }
981
982 /* Called in child-thread context */
983 static void thread_callback(struct tevent_context *ev,
984                                 struct tevent_immediate *im,
985                                 void *private_ptr)
986 {
987         struct reply_state *rsp =
988                 talloc_get_type_abort(private_ptr, struct reply_state);
989
990         talloc_steal(ev, rsp);
991         *rsp->p_finished = 1;
992 }
993
994 /* Called in master thread context */
995 static void master_callback(struct tevent_context *ev,
996                                 struct tevent_immediate *im,
997                                 void *private_ptr)
998 {
999         struct reply_state *rsp =
1000                 talloc_get_type_abort(private_ptr, struct reply_state);
1001         unsigned i;
1002
1003         talloc_steal(ev, rsp);
1004
1005         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1006                 if (pthread_equal(rsp->thread_id,
1007                                 thread_map[i])) {
1008                         break;
1009                 }
1010         }
1011         torture_comment(thread_test_ctx,
1012                         "Callback %u from thread %u\n",
1013                         thread_counter,
1014                         i);
1015         /* Now reply to the thread ! */
1016         tevent_thread_proxy_schedule(rsp->reply_tp,
1017                                 &im,
1018                                 thread_callback,
1019                                 &rsp);
1020
1021         thread_counter++;
1022 }
1023
1024 static void *thread_fn_1(void *private_ptr)
1025 {
1026         struct tevent_thread_proxy *master_tp =
1027                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1028         struct tevent_thread_proxy *tp;
1029         struct tevent_immediate *im;
1030         struct tevent_context *ev;
1031         struct reply_state *rsp;
1032         int finished = 0;
1033         int ret;
1034
1035         ev = tevent_context_init(NULL);
1036         if (ev == NULL) {
1037                 return NULL;
1038         }
1039
1040         tp = tevent_thread_proxy_create(ev);
1041         if (tp == NULL) {
1042                 talloc_free(ev);
1043                 return NULL;
1044         }
1045
1046         im = tevent_create_immediate(ev);
1047         if (im == NULL) {
1048                 talloc_free(ev);
1049                 return NULL;
1050         }
1051
1052         rsp = talloc(ev, struct reply_state);
1053         if (rsp == NULL) {
1054                 talloc_free(ev);
1055                 return NULL;
1056         }
1057
1058         rsp->thread_id = pthread_self();
1059         rsp->reply_tp = tp;
1060         rsp->p_finished = &finished;
1061
1062         /* Introduce a little randomness into the mix.. */
1063         usleep(random() % 7000);
1064
1065         tevent_thread_proxy_schedule(master_tp,
1066                                 &im,
1067                                 master_callback,
1068                                 &rsp);
1069
1070         /* Ensure we don't wait more than 10 seconds. */
1071         tevent_add_timer(ev,
1072                         ev,
1073                         timeval_current_ofs(10,0),
1074                         thread_timeout_fn,
1075                         &finished);
1076
1077         while (finished == 0) {
1078                 ret = tevent_loop_once(ev);
1079                 assert(ret == 0);
1080         }
1081
1082         if (finished > 1) {
1083                 /* Timeout ! */
1084                 abort();
1085         }
1086
1087         /*
1088          * NB. We should talloc_free(ev) here, but if we do
1089          * we currently get hit by helgrind Fix #323432
1090          * "When calling pthread_cond_destroy or pthread_mutex_destroy
1091          * with initializers as argument Helgrind (incorrectly) reports errors."
1092          *
1093          * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1094          * with-pthread-mutex-destroy-td47757.html
1095          *
1096          * Helgrind doesn't understand that the request/reply
1097          * messages provide synchronization between the lock/unlock
1098          * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1099          * when the struct tevent_thread_proxy object is talloc_free'd.
1100          *
1101          * As a work-around for now return ev for the parent thread to free.
1102          */
1103         return ev;
1104 }
1105
1106 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1107                                         const void *test_data)
1108 {
1109         unsigned i;
1110         struct tevent_context *master_ev;
1111         struct tevent_thread_proxy *master_tp;
1112         int ret;
1113
1114         talloc_disable_null_tracking();
1115
1116         /* Ugly global stuff. */
1117         thread_test_ctx = test;
1118         thread_counter = 0;
1119
1120         master_ev = tevent_context_init(NULL);
1121         if (master_ev == NULL) {
1122                 return false;
1123         }
1124         tevent_set_debug_stderr(master_ev);
1125
1126         master_tp = tevent_thread_proxy_create(master_ev);
1127         if (master_tp == NULL) {
1128                 torture_fail(test,
1129                         talloc_asprintf(test,
1130                                 "tevent_thread_proxy_create failed\n"));
1131                 talloc_free(master_ev);
1132                 return false;
1133         }
1134
1135         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1136                 ret = pthread_create(&thread_map[i],
1137                                 NULL,
1138                                 thread_fn_1,
1139                                 master_tp);
1140                 if (ret != 0) {
1141                         torture_fail(test,
1142                                 talloc_asprintf(test,
1143                                         "Failed to create thread %i, %d\n",
1144                                         i, ret));
1145                                 return false;
1146                 }
1147         }
1148
1149         while (thread_counter < NUM_TEVENT_THREADS) {
1150                 ret = tevent_loop_once(master_ev);
1151                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1152         }
1153
1154         /* Wait for all the threads to finish - join 'em. */
1155         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1156                 void *retval;
1157                 ret = pthread_join(thread_map[i], &retval);
1158                 torture_assert(test, ret == 0, "pthread_join failed");
1159                 /* Free the child thread event context. */
1160                 talloc_free(retval);
1161         }
1162
1163         talloc_free(master_ev);
1164         return true;
1165 }
1166
1167 struct threaded_test_2 {
1168         struct tevent_threaded_context *tctx;
1169         struct tevent_immediate *im;
1170         pthread_t thread_id;
1171 };
1172
1173 static void master_callback_2(struct tevent_context *ev,
1174                               struct tevent_immediate *im,
1175                               void *private_data);
1176
1177 static void *thread_fn_2(void *private_data)
1178 {
1179         struct threaded_test_2 *state = private_data;
1180
1181         state->thread_id = pthread_self();
1182
1183         usleep(random() % 7000);
1184
1185         tevent_threaded_schedule_immediate(
1186                 state->tctx, state->im, master_callback_2, state);
1187
1188         return NULL;
1189 }
1190
1191 static void master_callback_2(struct tevent_context *ev,
1192                               struct tevent_immediate *im,
1193                               void *private_data)
1194 {
1195         struct threaded_test_2 *state = private_data;
1196         int i;
1197
1198         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1199                 if (pthread_equal(state->thread_id, thread_map[i])) {
1200                         break;
1201                 }
1202         }
1203         torture_comment(thread_test_ctx,
1204                         "Callback_2 %u from thread %u\n",
1205                         thread_counter,
1206                         i);
1207         thread_counter++;
1208 }
1209
1210 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1211                                          const void *test_data)
1212 {
1213         unsigned i;
1214
1215         struct tevent_context *ev;
1216         struct tevent_threaded_context *tctx;
1217         int ret;
1218
1219         thread_test_ctx = test;
1220         thread_counter = 0;
1221
1222         ev = tevent_context_init(test);
1223         torture_assert(test, ev != NULL, "tevent_context_init failed");
1224
1225         tctx = tevent_threaded_context_create(ev, ev);
1226         torture_assert(test, tctx != NULL,
1227                        "tevent_threaded_context_create failed");
1228
1229         for (i=0; i<NUM_TEVENT_THREADS; i++) {
1230                 struct threaded_test_2 *state;
1231
1232                 state = talloc(ev, struct threaded_test_2);
1233                 torture_assert(test, state != NULL, "talloc failed");
1234
1235                 state->tctx = tctx;
1236                 state->im = tevent_create_immediate(state);
1237                 torture_assert(test, state->im != NULL,
1238                                "tevent_create_immediate failed");
1239
1240                 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1241                 torture_assert(test, ret == 0, "pthread_create failed");
1242         }
1243
1244         while (thread_counter < NUM_TEVENT_THREADS) {
1245                 ret = tevent_loop_once(ev);
1246                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1247         }
1248
1249         /* Wait for all the threads to finish - join 'em. */
1250         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1251                 void *retval;
1252                 ret = pthread_join(thread_map[i], &retval);
1253                 torture_assert(test, ret == 0, "pthread_join failed");
1254                 /* Free the child thread event context. */
1255         }
1256
1257         talloc_free(tctx);
1258         talloc_free(ev);
1259         return true;
1260 }
1261 #endif
1262
1263 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1264 {
1265         struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1266         const char **list = tevent_backend_list(suite);
1267         int i;
1268
1269         for (i=0;list && list[i];i++) {
1270                 struct torture_suite *backend_suite;
1271
1272                 backend_suite = torture_suite_create(mem_ctx, list[i]);
1273
1274                 torture_suite_add_simple_tcase_const(backend_suite,
1275                                                "context",
1276                                                test_event_context,
1277                                                (const void *)list[i]);
1278                 torture_suite_add_simple_tcase_const(backend_suite,
1279                                                "fd1",
1280                                                test_event_fd1,
1281                                                (const void *)list[i]);
1282                 torture_suite_add_simple_tcase_const(backend_suite,
1283                                                "fd2",
1284                                                test_event_fd2,
1285                                                (const void *)list[i]);
1286
1287                 torture_suite_add_suite(suite, backend_suite);
1288         }
1289
1290 #ifdef HAVE_PTHREAD
1291         torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1292                                              test_event_context_threaded,
1293                                              NULL);
1294
1295         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1296                                              test_multi_tevent_threaded,
1297                                              NULL);
1298
1299         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1300                                              test_multi_tevent_threaded_1,
1301                                              NULL);
1302
1303         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1304                                              test_multi_tevent_threaded_2,
1305                                              NULL);
1306
1307 #endif
1308
1309         return suite;
1310 }