492b88079966ae12749bfb765aa06701d5db7cac
[ab/samba-autobuild/.git] / lib / tevent / testsuite.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    testing of the events subsystem
5
6    Copyright (C) Stefan Metzmacher 2006-2009
7    Copyright (C) Jeremy Allison    2013
8
9      ** NOTE! The following LGPL license applies to the tevent
10      ** library. This does NOT imply that all of Samba is released
11      ** under the LGPL
12
13    This library is free software; you can redistribute it and/or
14    modify it under the terms of the GNU Lesser General Public
15    License as published by the Free Software Foundation; either
16    version 3 of the License, or (at your option) any later version.
17
18    This library is distributed in the hope that it will be useful,
19    but WITHOUT ANY WARRANTY; without even the implied warranty of
20    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
21    Lesser General Public License for more details.
22
23    You should have received a copy of the GNU Lesser General Public
24    License along with this library; if not, see <http://www.gnu.org/licenses/>.
25 */
26
27 #include "includes.h"
28 #define TEVENT_DEPRECATED 1
29 #include "tevent.h"
30 #include "system/filesys.h"
31 #include "system/select.h"
32 #include "system/network.h"
33 #include "torture/torture.h"
34 #include "torture/local/proto.h"
35 #ifdef HAVE_PTHREAD
36 #include "system/threads.h"
37 #include <assert.h>
38 #endif
39
40 static struct tevent_context *
41 test_tevent_context_init(TALLOC_CTX *mem_ctx)
42 {
43         struct tevent_context *ev = NULL;
44
45         ev = tevent_context_init(mem_ctx);
46         if (ev != NULL) {
47                 samba_tevent_set_debug(ev, "<default>");
48         }
49
50         return ev;
51 }
52
53 static struct tevent_context *
54 test_tevent_context_init_byname(TALLOC_CTX *mem_ctx, const char *name)
55 {
56         struct tevent_context *ev = NULL;
57
58         ev = tevent_context_init_byname(mem_ctx, name);
59         if (ev != NULL) {
60                 samba_tevent_set_debug(ev, name);
61         }
62
63         return ev;
64 }
65
66 static int fde_count;
67
68 static void do_read(int fd, void *buf, size_t count)
69 {
70         ssize_t ret;
71
72         do {
73                 ret = read(fd, buf, count);
74         } while (ret == -1 && errno == EINTR);
75 }
76
77 static void fde_handler_read(struct tevent_context *ev_ctx, struct tevent_fd *f,
78                         uint16_t flags, void *private_data)
79 {
80         int *fd = (int *)private_data;
81         char c;
82 #ifdef SA_SIGINFO
83         kill(getpid(), SIGUSR1);
84 #endif
85         kill(getpid(), SIGALRM);
86
87         do_read(fd[0], &c, 1);
88         fde_count++;
89 }
90
91 static void do_write(int fd, void *buf, size_t count)
92 {
93         ssize_t ret;
94
95         do {
96                 ret = write(fd, buf, count);
97         } while (ret == -1 && errno == EINTR);
98 }
99
100 static void fde_handler_write(struct tevent_context *ev_ctx, struct tevent_fd *f,
101                         uint16_t flags, void *private_data)
102 {
103         int *fd = (int *)private_data;
104         char c = 0;
105
106         do_write(fd[1], &c, 1);
107 }
108
109
110 /* This will only fire if the fd's returned from pipe() are bi-directional. */
111 static void fde_handler_read_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
112                         uint16_t flags, void *private_data)
113 {
114         int *fd = (int *)private_data;
115         char c;
116 #ifdef SA_SIGINFO
117         kill(getpid(), SIGUSR1);
118 #endif
119         kill(getpid(), SIGALRM);
120
121         do_read(fd[1], &c, 1);
122         fde_count++;
123 }
124
125 /* This will only fire if the fd's returned from pipe() are bi-directional. */
126 static void fde_handler_write_1(struct tevent_context *ev_ctx, struct tevent_fd *f,
127                         uint16_t flags, void *private_data)
128 {
129         int *fd = (int *)private_data;
130         char c = 0;
131         do_write(fd[0], &c, 1);
132 }
133
134 static void finished_handler(struct tevent_context *ev_ctx, struct tevent_timer *te,
135                              struct timeval tval, void *private_data)
136 {
137         int *finished = (int *)private_data;
138         (*finished) = 1;
139 }
140
141 static void count_handler(struct tevent_context *ev_ctx, struct tevent_signal *te,
142                           int signum, int count, void *info, void *private_data)
143 {
144         int *countp = (int *)private_data;
145         (*countp) += count;
146 }
147
148 static bool test_event_context(struct torture_context *test,
149                                const void *test_data)
150 {
151         struct tevent_context *ev_ctx;
152         int fd[2] = { -1, -1 };
153         const char *backend = (const char *)test_data;
154         int alarm_count=0, info_count=0;
155         struct tevent_fd *fde_read;
156         struct tevent_fd *fde_read_1;
157         struct tevent_fd *fde_write;
158         struct tevent_fd *fde_write_1;
159 #ifdef SA_RESTART
160         struct tevent_signal *se1 = NULL;
161 #endif
162 #ifdef SA_RESETHAND
163         struct tevent_signal *se2 = NULL;
164 #endif
165 #ifdef SA_SIGINFO
166         struct tevent_signal *se3 = NULL;
167 #endif
168         int finished=0;
169         struct timeval t;
170         int ret;
171
172         ev_ctx = test_tevent_context_init_byname(test, backend);
173         if (ev_ctx == NULL) {
174                 torture_comment(test, "event backend '%s' not supported\n", backend);
175                 return true;
176         }
177
178         torture_comment(test, "backend '%s' - %s\n",
179                         backend, __FUNCTION__);
180
181         /* reset globals */
182         fde_count = 0;
183
184         /* create a pipe */
185         ret = pipe(fd);
186         torture_assert_int_equal(test, ret, 0, "pipe failed");
187
188         fde_read = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_READ,
189                             fde_handler_read, fd);
190         fde_write_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[0], TEVENT_FD_WRITE,
191                             fde_handler_write_1, fd);
192
193         fde_write = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_WRITE,
194                             fde_handler_write, fd);
195         fde_read_1 = tevent_add_fd(ev_ctx, ev_ctx, fd[1], TEVENT_FD_READ,
196                             fde_handler_read_1, fd);
197
198         tevent_fd_set_auto_close(fde_read);
199         tevent_fd_set_auto_close(fde_write);
200
201         tevent_add_timer(ev_ctx, ev_ctx, timeval_current_ofs(2,0),
202                          finished_handler, &finished);
203
204 #ifdef SA_RESTART
205         se1 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESTART, count_handler, &alarm_count);
206         torture_assert(test, se1 != NULL, "failed to setup se1");
207 #endif
208 #ifdef SA_RESETHAND
209         se2 = tevent_add_signal(ev_ctx, ev_ctx, SIGALRM, SA_RESETHAND, count_handler, &alarm_count);
210         torture_assert(test, se2 != NULL, "failed to setup se2");
211 #endif
212 #ifdef SA_SIGINFO
213         se3 = tevent_add_signal(ev_ctx, ev_ctx, SIGUSR1, SA_SIGINFO, count_handler, &info_count);
214         torture_assert(test, se3 != NULL, "failed to setup se3");
215 #endif
216
217         t = timeval_current();
218         while (!finished) {
219                 errno = 0;
220                 if (tevent_loop_once(ev_ctx) == -1) {
221                         TALLOC_FREE(ev_ctx);
222                         torture_fail(test, talloc_asprintf(test, "Failed event loop %s\n", strerror(errno)));
223                         return false;
224                 }
225         }
226
227         talloc_free(fde_read_1);
228         talloc_free(fde_write_1);
229         talloc_free(fde_read);
230         talloc_free(fde_write);
231
232         while (alarm_count < fde_count+1) {
233                 if (tevent_loop_once(ev_ctx) == -1) {
234                         break;
235                 }
236         }
237
238         torture_comment(test, "Got %.2f pipe events/sec\n", fde_count/timeval_elapsed(&t));
239
240 #ifdef SA_RESTART
241         talloc_free(se1);
242 #endif
243
244         torture_assert_int_equal(test, alarm_count, 1+fde_count, "alarm count mismatch");
245
246 #ifdef SA_RESETHAND
247         /*
248          * we do not call talloc_free(se2)
249          * because it is already gone,
250          * after triggering the event handler.
251          */
252 #endif
253
254 #ifdef SA_SIGINFO
255         talloc_free(se3);
256         torture_assert_int_equal(test, info_count, fde_count, "info count mismatch");
257 #endif
258
259         talloc_free(ev_ctx);
260
261         return true;
262 }
263
264 struct test_event_fd1_state {
265         struct torture_context *tctx;
266         const char *backend;
267         struct tevent_context *ev;
268         int sock[2];
269         struct tevent_timer *te;
270         struct tevent_fd *fde0;
271         struct tevent_fd *fde1;
272         bool got_write;
273         bool got_read;
274         bool drain;
275         bool drain_done;
276         unsigned loop_count;
277         bool finished;
278         const char *error;
279 };
280
281 static void test_event_fd1_fde_handler(struct tevent_context *ev_ctx,
282                                        struct tevent_fd *fde,
283                                        uint16_t flags,
284                                        void *private_data)
285 {
286         struct test_event_fd1_state *state =
287                 (struct test_event_fd1_state *)private_data;
288
289         if (state->drain_done) {
290                 state->finished = true;
291                 state->error = __location__;
292                 return;
293         }
294
295         if (state->drain) {
296                 ssize_t ret;
297                 uint8_t c = 0;
298
299                 if (!(flags & TEVENT_FD_READ)) {
300                         state->finished = true;
301                         state->error = __location__;
302                         return;
303                 }
304
305                 ret = read(state->sock[0], &c, 1);
306                 if (ret == 1) {
307                         return;
308                 }
309
310                 /*
311                  * end of test...
312                  */
313                 tevent_fd_set_flags(fde, 0);
314                 state->drain_done = true;
315                 return;
316         }
317
318         if (!state->got_write) {
319                 uint8_t c = 0;
320
321                 if (flags != TEVENT_FD_WRITE) {
322                         state->finished = true;
323                         state->error = __location__;
324                         return;
325                 }
326                 state->got_write = true;
327
328                 /*
329                  * we write to the other socket...
330                  */
331                 do_write(state->sock[1], &c, 1);
332                 TEVENT_FD_NOT_WRITEABLE(fde);
333                 TEVENT_FD_READABLE(fde);
334                 return;
335         }
336
337         if (!state->got_read) {
338                 if (flags != TEVENT_FD_READ) {
339                         state->finished = true;
340                         state->error = __location__;
341                         return;
342                 }
343                 state->got_read = true;
344
345                 TEVENT_FD_NOT_READABLE(fde);
346                 return;
347         }
348
349         state->finished = true;
350         state->error = __location__;
351         return;
352 }
353
354 static void test_event_fd1_finished(struct tevent_context *ev_ctx,
355                                     struct tevent_timer *te,
356                                     struct timeval tval,
357                                     void *private_data)
358 {
359         struct test_event_fd1_state *state =
360                 (struct test_event_fd1_state *)private_data;
361
362         if (state->drain_done) {
363                 state->finished = true;
364                 return;
365         }
366
367         if (!state->got_write) {
368                 state->finished = true;
369                 state->error = __location__;
370                 return;
371         }
372
373         if (!state->got_read) {
374                 state->finished = true;
375                 state->error = __location__;
376                 return;
377         }
378
379         state->loop_count++;
380         if (state->loop_count > 3) {
381                 state->finished = true;
382                 state->error = __location__;
383                 return;
384         }
385
386         state->got_write = false;
387         state->got_read = false;
388
389         tevent_fd_set_flags(state->fde0, TEVENT_FD_WRITE);
390
391         if (state->loop_count > 2) {
392                 state->drain = true;
393                 TALLOC_FREE(state->fde1);
394                 TEVENT_FD_READABLE(state->fde0);
395         }
396
397         state->te = tevent_add_timer(state->ev, state->ev,
398                                     timeval_current_ofs(0,2000),
399                                     test_event_fd1_finished, state);
400 }
401
402 static bool test_event_fd1(struct torture_context *tctx,
403                            const void *test_data)
404 {
405         struct test_event_fd1_state state;
406         int ret;
407
408         ZERO_STRUCT(state);
409         state.tctx = tctx;
410         state.backend = (const char *)test_data;
411
412         state.ev = test_tevent_context_init_byname(tctx, state.backend);
413         if (state.ev == NULL) {
414                 torture_skip(tctx, talloc_asprintf(tctx,
415                              "event backend '%s' not supported\n",
416                              state.backend));
417                 return true;
418         }
419
420         torture_comment(tctx, "backend '%s' - %s\n",
421                         state.backend, __FUNCTION__);
422
423         /*
424          * This tests the following:
425          *
426          * It monitors the state of state.sock[0]
427          * with tevent_fd, but we never read/write on state.sock[0]
428          * while state.sock[1] * is only used to write a few bytes.
429          *
430          * We have a loop:
431          *   - we wait only for TEVENT_FD_WRITE on state.sock[0]
432          *   - we write 1 byte to state.sock[1]
433          *   - we wait only for TEVENT_FD_READ on state.sock[0]
434          *   - we disable events on state.sock[0]
435          *   - the timer event restarts the loop
436          * Then we close state.sock[1]
437          * We have a loop:
438          *   - we wait for TEVENT_FD_READ/WRITE on state.sock[0]
439          *   - we try to read 1 byte
440          *   - if the read gets an error of returns 0
441          *     we disable the event handler
442          *   - the timer finishes the test
443          */
444         state.sock[0] = -1;
445         state.sock[1] = -1;
446
447         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, state.sock);
448         torture_assert(tctx, ret == 0, "socketpair() failed");
449
450         state.te = tevent_add_timer(state.ev, state.ev,
451                                     timeval_current_ofs(0,10000),
452                                     test_event_fd1_finished, &state);
453         state.fde0 = tevent_add_fd(state.ev, state.ev,
454                                    state.sock[0], TEVENT_FD_WRITE,
455                                    test_event_fd1_fde_handler, &state);
456         /* state.fde1 is only used to auto close */
457         state.fde1 = tevent_add_fd(state.ev, state.ev,
458                                    state.sock[1], 0,
459                                    test_event_fd1_fde_handler, &state);
460
461         tevent_fd_set_auto_close(state.fde0);
462         tevent_fd_set_auto_close(state.fde1);
463
464         while (!state.finished) {
465                 errno = 0;
466                 if (tevent_loop_once(state.ev) == -1) {
467                         talloc_free(state.ev);
468                         torture_fail(tctx, talloc_asprintf(tctx,
469                                      "Failed event loop %s\n",
470                                      strerror(errno)));
471                 }
472         }
473
474         talloc_free(state.ev);
475
476         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
477                        "%s", state.error));
478
479         return true;
480 }
481
482 struct test_event_fd2_state {
483         struct torture_context *tctx;
484         const char *backend;
485         struct tevent_context *ev;
486         struct tevent_timer *te;
487         struct test_event_fd2_sock {
488                 struct test_event_fd2_state *state;
489                 int fd;
490                 struct tevent_fd *fde;
491                 size_t num_written;
492                 size_t num_read;
493                 bool got_full;
494         } sock0, sock1;
495         bool finished;
496         const char *error;
497 };
498
499 static void test_event_fd2_sock_handler(struct tevent_context *ev_ctx,
500                                         struct tevent_fd *fde,
501                                         uint16_t flags,
502                                         void *private_data)
503 {
504         struct test_event_fd2_sock *cur_sock =
505                 (struct test_event_fd2_sock *)private_data;
506         struct test_event_fd2_state *state = cur_sock->state;
507         struct test_event_fd2_sock *oth_sock = NULL;
508         uint8_t v = 0, c;
509         ssize_t ret;
510
511         if (cur_sock == &state->sock0) {
512                 oth_sock = &state->sock1;
513         } else {
514                 oth_sock = &state->sock0;
515         }
516
517         if (oth_sock->num_written == 1) {
518                 if (flags != (TEVENT_FD_READ | TEVENT_FD_WRITE)) {
519                         state->finished = true;
520                         state->error = __location__;
521                         return;
522                 }
523         }
524
525         if (cur_sock->num_read == oth_sock->num_written) {
526                 state->finished = true;
527                 state->error = __location__;
528                 return;
529         }
530
531         if (!(flags & TEVENT_FD_READ)) {
532                 state->finished = true;
533                 state->error = __location__;
534                 return;
535         }
536
537         if (oth_sock->num_read >= PIPE_BUF) {
538                 /*
539                  * On Linux we become writable once we've read
540                  * one byte. On Solaris we only become writable
541                  * again once we've read 4096 bytes. PIPE_BUF
542                  * is probably a safe bet to test against.
543                  *
544                  * There should be room to write a byte again
545                  */
546                 if (!(flags & TEVENT_FD_WRITE)) {
547                         state->finished = true;
548                         state->error = __location__;
549                         return;
550                 }
551         }
552
553         if ((flags & TEVENT_FD_WRITE) && !cur_sock->got_full) {
554                 v = (uint8_t)cur_sock->num_written;
555                 ret = write(cur_sock->fd, &v, 1);
556                 if (ret != 1) {
557                         state->finished = true;
558                         state->error = __location__;
559                         return;
560                 }
561                 cur_sock->num_written++;
562                 if (cur_sock->num_written > 0x80000000) {
563                         state->finished = true;
564                         state->error = __location__;
565                         return;
566                 }
567                 return;
568         }
569
570         if (!cur_sock->got_full) {
571                 cur_sock->got_full = true;
572
573                 if (!oth_sock->got_full) {
574                         /*
575                          * cur_sock is full,
576                          * lets wait for oth_sock
577                          * to be filled
578                          */
579                         tevent_fd_set_flags(cur_sock->fde, 0);
580                         return;
581                 }
582
583                 /*
584                  * oth_sock waited for cur_sock,
585                  * lets restart it
586                  */
587                 tevent_fd_set_flags(oth_sock->fde,
588                                     TEVENT_FD_READ|TEVENT_FD_WRITE);
589         }
590
591         ret = read(cur_sock->fd, &v, 1);
592         if (ret != 1) {
593                 state->finished = true;
594                 state->error = __location__;
595                 return;
596         }
597         c = (uint8_t)cur_sock->num_read;
598         if (c != v) {
599                 state->finished = true;
600                 state->error = __location__;
601                 return;
602         }
603         cur_sock->num_read++;
604
605         if (cur_sock->num_read < oth_sock->num_written) {
606                 /* there is more to read */
607                 return;
608         }
609         /*
610          * we read everything, we need to remove TEVENT_FD_WRITE
611          * to avoid spinning
612          */
613         TEVENT_FD_NOT_WRITEABLE(cur_sock->fde);
614
615         if (oth_sock->num_read == cur_sock->num_written) {
616                 /*
617                  * both directions are finished
618                  */
619                 state->finished = true;
620         }
621
622         return;
623 }
624
625 static void test_event_fd2_finished(struct tevent_context *ev_ctx,
626                                     struct tevent_timer *te,
627                                     struct timeval tval,
628                                     void *private_data)
629 {
630         struct test_event_fd2_state *state =
631                 (struct test_event_fd2_state *)private_data;
632
633         /*
634          * this should never be triggered
635          */
636         state->finished = true;
637         state->error = __location__;
638 }
639
640 static bool test_event_fd2(struct torture_context *tctx,
641                            const void *test_data)
642 {
643         struct test_event_fd2_state state;
644         int sock[2];
645         uint8_t c = 0;
646
647         ZERO_STRUCT(state);
648         state.tctx = tctx;
649         state.backend = (const char *)test_data;
650
651         state.ev = test_tevent_context_init_byname(tctx, state.backend);
652         if (state.ev == NULL) {
653                 torture_skip(tctx, talloc_asprintf(tctx,
654                              "event backend '%s' not supported\n",
655                              state.backend));
656                 return true;
657         }
658
659         torture_comment(tctx, "backend '%s' - %s\n",
660                         state.backend, __FUNCTION__);
661
662         /*
663          * This tests the following
664          *
665          * - We write 1 byte to each socket
666          * - We wait for TEVENT_FD_READ/WRITE on both sockets
667          * - When we get TEVENT_FD_WRITE we write 1 byte
668          *   until both socket buffers are full, which
669          *   means both sockets only get TEVENT_FD_READ.
670          * - Then we read 1 byte until we have consumed
671          *   all bytes the other end has written.
672          */
673         sock[0] = -1;
674         sock[1] = -1;
675         socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
676
677         /*
678          * the timer should never expire
679          */
680         state.te = tevent_add_timer(state.ev, state.ev,
681                                     timeval_current_ofs(600, 0),
682                                     test_event_fd2_finished, &state);
683         state.sock0.state = &state;
684         state.sock0.fd = sock[0];
685         state.sock0.fde = tevent_add_fd(state.ev, state.ev,
686                                         state.sock0.fd,
687                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
688                                         test_event_fd2_sock_handler,
689                                         &state.sock0);
690         state.sock1.state = &state;
691         state.sock1.fd = sock[1];
692         state.sock1.fde = tevent_add_fd(state.ev, state.ev,
693                                         state.sock1.fd,
694                                         TEVENT_FD_READ | TEVENT_FD_WRITE,
695                                         test_event_fd2_sock_handler,
696                                         &state.sock1);
697
698         tevent_fd_set_auto_close(state.sock0.fde);
699         tevent_fd_set_auto_close(state.sock1.fde);
700
701         do_write(state.sock0.fd, &c, 1);
702         state.sock0.num_written++;
703         do_write(state.sock1.fd, &c, 1);
704         state.sock1.num_written++;
705
706         while (!state.finished) {
707                 errno = 0;
708                 if (tevent_loop_once(state.ev) == -1) {
709                         talloc_free(state.ev);
710                         torture_fail(tctx, talloc_asprintf(tctx,
711                                      "Failed event loop %s\n",
712                                      strerror(errno)));
713                 }
714         }
715
716         talloc_free(state.ev);
717
718         torture_assert(tctx, state.error == NULL, talloc_asprintf(tctx,
719                        "%s", state.error));
720
721         return true;
722 }
723
724 struct test_wrapper_state {
725         struct torture_context *tctx;
726         int num_events;
727         int num_wrap_handlers;
728 };
729
730 static bool test_wrapper_before_use(struct tevent_context *wrap_ev,
731                                     void *private_data,
732                                     struct tevent_context *main_ev,
733                                     const char *location)
734 {
735         struct test_wrapper_state *state =
736                 talloc_get_type_abort(private_data,
737                 struct test_wrapper_state);
738
739         torture_comment(state->tctx, "%s\n", __func__);
740         state->num_wrap_handlers++;
741         return true;
742 }
743
744 static void test_wrapper_after_use(struct tevent_context *wrap_ev,
745                                    void *private_data,
746                                    struct tevent_context *main_ev,
747                                    const char *location)
748 {
749         struct test_wrapper_state *state =
750                 talloc_get_type_abort(private_data,
751                 struct test_wrapper_state);
752
753         torture_comment(state->tctx, "%s\n", __func__);
754         state->num_wrap_handlers++;
755 }
756
757 static void test_wrapper_before_fd_handler(struct tevent_context *wrap_ev,
758                                            void *private_data,
759                                            struct tevent_context *main_ev,
760                                            struct tevent_fd *fde,
761                                            uint16_t flags,
762                                            const char *handler_name,
763                                            const char *location)
764 {
765         struct test_wrapper_state *state =
766                 talloc_get_type_abort(private_data,
767                 struct test_wrapper_state);
768
769         torture_comment(state->tctx, "%s\n", __func__);
770         state->num_wrap_handlers++;
771 }
772
773 static void test_wrapper_after_fd_handler(struct tevent_context *wrap_ev,
774                                           void *private_data,
775                                           struct tevent_context *main_ev,
776                                           struct tevent_fd *fde,
777                                           uint16_t flags,
778                                           const char *handler_name,
779                                           const char *location)
780 {
781         struct test_wrapper_state *state =
782                 talloc_get_type_abort(private_data,
783                 struct test_wrapper_state);
784
785         torture_comment(state->tctx, "%s\n", __func__);
786         state->num_wrap_handlers++;
787 }
788
789 static void test_wrapper_before_timer_handler(struct tevent_context *wrap_ev,
790                                               void *private_data,
791                                               struct tevent_context *main_ev,
792                                               struct tevent_timer *te,
793                                               struct timeval requested_time,
794                                               struct timeval trigger_time,
795                                               const char *handler_name,
796                                               const char *location)
797 {
798         struct test_wrapper_state *state =
799                 talloc_get_type_abort(private_data,
800                 struct test_wrapper_state);
801
802         torture_comment(state->tctx, "%s\n", __func__);
803         state->num_wrap_handlers++;
804 }
805
806 static void test_wrapper_after_timer_handler(struct tevent_context *wrap_ev,
807                                              void *private_data,
808                                              struct tevent_context *main_ev,
809                                              struct tevent_timer *te,
810                                              struct timeval requested_time,
811                                              struct timeval trigger_time,
812                                              const char *handler_name,
813                                              const char *location)
814 {
815         struct test_wrapper_state *state =
816                 talloc_get_type_abort(private_data,
817                 struct test_wrapper_state);
818
819         torture_comment(state->tctx, "%s\n", __func__);
820         state->num_wrap_handlers++;
821 }
822
823 static void test_wrapper_before_immediate_handler(struct tevent_context *wrap_ev,
824                                                   void *private_data,
825                                                   struct tevent_context *main_ev,
826                                                   struct tevent_immediate *im,
827                                                   const char *handler_name,
828                                                   const char *location)
829 {
830         struct test_wrapper_state *state =
831                 talloc_get_type_abort(private_data,
832                 struct test_wrapper_state);
833
834         torture_comment(state->tctx, "%s\n", __func__);
835         state->num_wrap_handlers++;
836 }
837
838 static void test_wrapper_after_immediate_handler(struct tevent_context *wrap_ev,
839                                                  void *private_data,
840                                                  struct tevent_context *main_ev,
841                                                  struct tevent_immediate *im,
842                                                  const char *handler_name,
843                                                  const char *location)
844 {
845         struct test_wrapper_state *state =
846                 talloc_get_type_abort(private_data,
847                 struct test_wrapper_state);
848
849         torture_comment(state->tctx, "%s\n", __func__);
850         state->num_wrap_handlers++;
851 }
852
853 static void test_wrapper_before_signal_handler(struct tevent_context *wrap_ev,
854                                                void *private_data,
855                                                struct tevent_context *main_ev,
856                                                struct tevent_signal *se,
857                                                int signum,
858                                                int count,
859                                                void *siginfo,
860                                                const char *handler_name,
861                                                const char *location)
862 {
863         struct test_wrapper_state *state =
864                 talloc_get_type_abort(private_data,
865                 struct test_wrapper_state);
866
867         torture_comment(state->tctx, "%s\n", __func__);
868         state->num_wrap_handlers++;
869 }
870
871 static void test_wrapper_after_signal_handler(struct tevent_context *wrap_ev,
872                                               void *private_data,
873                                               struct tevent_context *main_ev,
874                                               struct tevent_signal *se,
875                                               int signum,
876                                               int count,
877                                               void *siginfo,
878                                               const char *handler_name,
879                                               const char *location)
880 {
881         struct test_wrapper_state *state =
882                 talloc_get_type_abort(private_data,
883                 struct test_wrapper_state);
884
885         torture_comment(state->tctx, "%s\n", __func__);
886         state->num_wrap_handlers++;
887 }
888
889 static const struct tevent_wrapper_ops test_wrapper_ops = {
890         .name                           = "test_wrapper",
891         .before_use                     = test_wrapper_before_use,
892         .after_use                      = test_wrapper_after_use,
893         .before_fd_handler              = test_wrapper_before_fd_handler,
894         .after_fd_handler               = test_wrapper_after_fd_handler,
895         .before_timer_handler           = test_wrapper_before_timer_handler,
896         .after_timer_handler            = test_wrapper_after_timer_handler,
897         .before_immediate_handler       = test_wrapper_before_immediate_handler,
898         .after_immediate_handler        = test_wrapper_after_immediate_handler,
899         .before_signal_handler          = test_wrapper_before_signal_handler,
900         .after_signal_handler           = test_wrapper_after_signal_handler,
901 };
902
903 static void test_wrapper_timer_handler(struct tevent_context *ev,
904                                        struct tevent_timer *te,
905                                        struct timeval tv,
906                                        void *private_data)
907 {
908         struct test_wrapper_state *state =
909                 (struct test_wrapper_state *)private_data;
910
911
912         torture_comment(state->tctx, "timer handler\n");
913
914         state->num_events++;
915         talloc_free(te);
916         return;
917 }
918
919 static void test_wrapper_fd_handler(struct tevent_context *ev,
920                                     struct tevent_fd *fde,
921                                     unsigned short fd_flags,
922                                     void *private_data)
923 {
924         struct test_wrapper_state *state =
925                 (struct test_wrapper_state *)private_data;
926
927         torture_comment(state->tctx, "fd handler\n");
928
929         state->num_events++;
930         talloc_free(fde);
931         return;
932 }
933
934 static void test_wrapper_immediate_handler(struct tevent_context *ev,
935                                            struct tevent_immediate *im,
936                                            void *private_data)
937 {
938         struct test_wrapper_state *state =
939                 (struct test_wrapper_state *)private_data;
940
941         state->num_events++;
942         talloc_free(im);
943
944         torture_comment(state->tctx, "immediate handler\n");
945         return;
946 }
947
948 static void test_wrapper_signal_handler(struct tevent_context *ev,
949                                         struct tevent_signal *se,
950                                         int signum,
951                                         int count,
952                                         void *siginfo,
953                                         void *private_data)
954 {
955         struct test_wrapper_state *state =
956                 (struct test_wrapper_state *)private_data;
957
958         torture_comment(state->tctx, "signal handler\n");
959
960         state->num_events++;
961         talloc_free(se);
962         return;
963 }
964
965 static bool test_wrapper(struct torture_context *tctx,
966                          const void *test_data)
967 {
968         struct test_wrapper_state *state = NULL;
969         int sock[2] = { -1, -1};
970         uint8_t c = 0;
971         const int num_events = 4;
972         const char *backend = (const char *)test_data;
973         struct tevent_context *ev = NULL;
974         struct tevent_context *wrap_ev = NULL;
975         struct tevent_fd *fde = NULL;
976         struct tevent_timer *te = NULL;
977         struct tevent_signal *se = NULL;
978         struct tevent_immediate *im = NULL;
979         int ret;
980         bool ok = false;
981         bool ret2;
982
983         ev = test_tevent_context_init_byname(tctx, backend);
984         if (ev == NULL) {
985                 torture_skip(tctx, talloc_asprintf(tctx,
986                              "event backend '%s' not supported\n",
987                              backend));
988                 return true;
989         }
990
991         torture_comment(tctx, "tevent backend '%s'\n", backend);
992
993         wrap_ev = tevent_context_wrapper_create(
994                 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
995         torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
996                                      "tevent_context_wrapper_create failed\n");
997         *state = (struct test_wrapper_state) {
998                 .tctx = tctx,
999         };
1000
1001         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1002         torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1003
1004         te = tevent_add_timer(wrap_ev, wrap_ev,
1005                               timeval_current_ofs(0, 0),
1006                               test_wrapper_timer_handler, state);
1007         torture_assert_not_null_goto(tctx, te, ok, done,
1008                                      "tevent_add_timer failed\n");
1009
1010         fde = tevent_add_fd(wrap_ev, wrap_ev,
1011                             sock[1],
1012                             TEVENT_FD_READ,
1013                             test_wrapper_fd_handler,
1014                             state);
1015         torture_assert_not_null_goto(tctx, fde, ok, done,
1016                                      "tevent_add_fd failed\n");
1017
1018         im = tevent_create_immediate(wrap_ev);
1019         torture_assert_not_null_goto(tctx, im, ok, done,
1020                                      "tevent_create_immediate failed\n");
1021
1022         se = tevent_add_signal(wrap_ev, wrap_ev,
1023                                SIGUSR1,
1024                                0,
1025                                test_wrapper_signal_handler,
1026                                state);
1027         torture_assert_not_null_goto(tctx, se, ok, done,
1028                                      "tevent_add_signal failed\n");
1029
1030         do_write(sock[0], &c, 1);
1031         kill(getpid(), SIGUSR1);
1032         tevent_schedule_immediate(im,
1033                                   wrap_ev,
1034                                   test_wrapper_immediate_handler,
1035                                   state);
1036
1037         ret2 = tevent_context_push_use(wrap_ev);
1038         torture_assert_goto(tctx, ret2, ok, done, "tevent_context_push_use(wrap_ev) failed\n");
1039         ret2 = tevent_context_push_use(ev);
1040         torture_assert_goto(tctx, ret2, ok, pop_use, "tevent_context_push_use(ev) failed\n");
1041         tevent_context_pop_use(ev);
1042         tevent_context_pop_use(wrap_ev);
1043
1044         ret = tevent_loop_wait(ev);
1045         torture_assert_int_equal_goto(tctx, ret, 0, ok, done, "tevent_loop_wait failed\n");
1046
1047         torture_comment(tctx, "Num events: %d\n", state->num_events);
1048         torture_comment(tctx, "Num wrap handlers: %d\n",
1049                         state->num_wrap_handlers);
1050
1051         torture_assert_int_equal_goto(tctx, state->num_events, num_events, ok, done,
1052                                       "Wrong event count\n");
1053         torture_assert_int_equal_goto(tctx, state->num_wrap_handlers,
1054                                       num_events*2+2,
1055                                       ok, done, "Wrong wrapper count\n");
1056
1057         ok = true;
1058
1059 done:
1060         TALLOC_FREE(wrap_ev);
1061         TALLOC_FREE(ev);
1062
1063         if (sock[0] != -1) {
1064                 close(sock[0]);
1065         }
1066         if (sock[1] != -1) {
1067                 close(sock[1]);
1068         }
1069         return ok;
1070 pop_use:
1071         tevent_context_pop_use(wrap_ev);
1072         goto done;
1073 }
1074
1075 static void test_free_wrapper_signal_handler(struct tevent_context *ev,
1076                                         struct tevent_signal *se,
1077                                         int signum,
1078                                         int count,
1079                                         void *siginfo,
1080                                         void *private_data)
1081 {
1082         struct torture_context *tctx =
1083                 talloc_get_type_abort(private_data,
1084                 struct torture_context);
1085
1086         torture_comment(tctx, "signal handler\n");
1087
1088         talloc_free(se);
1089
1090         /*
1091          * signal handlers have highest priority in tevent, so this signal
1092          * handler will always be started before the other handlers
1093          * below. Freeing the (wrapper) event context here tests that the
1094          * wrapper implementation correclty handles the wrapper ev going away
1095          * with pending events.
1096          */
1097         talloc_free(ev);
1098         return;
1099 }
1100
1101 static void test_free_wrapper_fd_handler(struct tevent_context *ev,
1102                                          struct tevent_fd *fde,
1103                                          unsigned short fd_flags,
1104                                          void *private_data)
1105 {
1106         /*
1107          * This should never be called as
1108          * test_free_wrapper_signal_handler()
1109          * already destroyed the wrapper tevent_context.
1110          */
1111         abort();
1112 }
1113
1114 static void test_free_wrapper_immediate_handler(struct tevent_context *ev,
1115                                            struct tevent_immediate *im,
1116                                            void *private_data)
1117 {
1118         /*
1119          * This should never be called as
1120          * test_free_wrapper_signal_handler()
1121          * already destroyed the wrapper tevent_context.
1122          */
1123         abort();
1124 }
1125
1126 static void test_free_wrapper_timer_handler(struct tevent_context *ev,
1127                                        struct tevent_timer *te,
1128                                        struct timeval tv,
1129                                        void *private_data)
1130 {
1131         /*
1132          * This should never be called as
1133          * test_free_wrapper_signal_handler()
1134          * already destroyed the wrapper tevent_context.
1135          */
1136         abort();
1137 }
1138
1139 static bool test_free_wrapper(struct torture_context *tctx,
1140                               const void *test_data)
1141 {
1142         struct test_wrapper_state *state = NULL;
1143         int sock[2] = { -1, -1};
1144         uint8_t c = 0;
1145         const char *backend = (const char *)test_data;
1146         TALLOC_CTX *frame = talloc_stackframe();
1147         struct tevent_context *ev = NULL;
1148         struct tevent_context *wrap_ev = NULL;
1149         struct tevent_fd *fde = NULL;
1150         struct tevent_timer *te = NULL;
1151         struct tevent_signal *se = NULL;
1152         struct tevent_immediate *im = NULL;
1153         int ret;
1154         bool ok = false;
1155
1156         ev = test_tevent_context_init_byname(frame, backend);
1157         if (ev == NULL) {
1158                 torture_skip(tctx, talloc_asprintf(tctx,
1159                              "event backend '%s' not supported\n",
1160                              backend));
1161                 return true;
1162         }
1163
1164         torture_comment(tctx, "tevent backend '%s'\n", backend);
1165
1166         wrap_ev = tevent_context_wrapper_create(
1167                 ev, ev, &test_wrapper_ops, &state, struct test_wrapper_state);
1168         torture_assert_not_null_goto(tctx, wrap_ev, ok, done,
1169                                      "tevent_context_wrapper_create failed\n");
1170         *state = (struct test_wrapper_state) {
1171                 .tctx = tctx,
1172         };
1173
1174         ret = socketpair(AF_UNIX, SOCK_STREAM, 0, sock);
1175         torture_assert_goto(tctx, ret == 0, ok, done, "socketpair failed\n");
1176
1177         fde = tevent_add_fd(wrap_ev, frame,
1178                             sock[1],
1179                             TEVENT_FD_READ,
1180                             test_free_wrapper_fd_handler,
1181                             NULL);
1182         torture_assert_not_null_goto(tctx, fde, ok, done,
1183                                      "tevent_add_fd failed\n");
1184
1185         te = tevent_add_timer(wrap_ev, frame,
1186                               timeval_current_ofs(0, 0),
1187                               test_free_wrapper_timer_handler, NULL);
1188         torture_assert_not_null_goto(tctx, te, ok, done,
1189                                      "tevent_add_timer failed\n");
1190
1191         im = tevent_create_immediate(frame);
1192         torture_assert_not_null_goto(tctx, im, ok, done,
1193                                      "tevent_create_immediate failed\n");
1194
1195         se = tevent_add_signal(wrap_ev, frame,
1196                                SIGUSR1,
1197                                0,
1198                                test_free_wrapper_signal_handler,
1199                                tctx);
1200         torture_assert_not_null_goto(tctx, se, ok, done,
1201                                      "tevent_add_signal failed\n");
1202
1203         do_write(sock[0], &c, 1);
1204         kill(getpid(), SIGUSR1);
1205         tevent_schedule_immediate(im,
1206                                   wrap_ev,
1207                                   test_free_wrapper_immediate_handler,
1208                                   NULL);
1209
1210         ret = tevent_loop_wait(ev);
1211         torture_assert_goto(tctx, ret == 0, ok, done, "tevent_loop_wait failed\n");
1212
1213         ok = true;
1214
1215 done:
1216         TALLOC_FREE(frame);
1217
1218         if (sock[0] != -1) {
1219                 close(sock[0]);
1220         }
1221         if (sock[1] != -1) {
1222                 close(sock[1]);
1223         }
1224         return ok;
1225 }
1226
1227 #ifdef HAVE_PTHREAD
1228
1229 static pthread_mutex_t threaded_mutex = PTHREAD_MUTEX_INITIALIZER;
1230 static bool do_shutdown = false;
1231
1232 static void test_event_threaded_lock(void)
1233 {
1234         int ret;
1235         ret = pthread_mutex_lock(&threaded_mutex);
1236         assert(ret == 0);
1237 }
1238
1239 static void test_event_threaded_unlock(void)
1240 {
1241         int ret;
1242         ret = pthread_mutex_unlock(&threaded_mutex);
1243         assert(ret == 0);
1244 }
1245
1246 static void test_event_threaded_trace(enum tevent_trace_point point,
1247                                       void *private_data)
1248 {
1249         switch (point) {
1250         case TEVENT_TRACE_BEFORE_WAIT:
1251                 test_event_threaded_unlock();
1252                 break;
1253         case TEVENT_TRACE_AFTER_WAIT:
1254                 test_event_threaded_lock();
1255                 break;
1256         case TEVENT_TRACE_BEFORE_LOOP_ONCE:
1257         case TEVENT_TRACE_AFTER_LOOP_ONCE:
1258                 break;
1259         }
1260 }
1261
1262 static void test_event_threaded_timer(struct tevent_context *ev,
1263                                       struct tevent_timer *te,
1264                                       struct timeval current_time,
1265                                       void *private_data)
1266 {
1267         return;
1268 }
1269
1270 static void *test_event_poll_thread(void *private_data)
1271 {
1272         struct tevent_context *ev = (struct tevent_context *)private_data;
1273
1274         test_event_threaded_lock();
1275
1276         while (true) {
1277                 int ret;
1278                 ret = tevent_loop_once(ev);
1279                 assert(ret == 0);
1280                 if (do_shutdown) {
1281                         test_event_threaded_unlock();
1282                         return NULL;
1283                 }
1284         }
1285
1286 }
1287
1288 static void test_event_threaded_read_handler(struct tevent_context *ev,
1289                                              struct tevent_fd *fde,
1290                                              uint16_t flags,
1291                                              void *private_data)
1292 {
1293         int *pfd = (int *)private_data;
1294         char c;
1295         ssize_t nread;
1296
1297         if ((flags & TEVENT_FD_READ) == 0) {
1298                 return;
1299         }
1300
1301         do {
1302                 nread = read(*pfd, &c, 1);
1303         } while ((nread == -1) && (errno == EINTR));
1304
1305         assert(nread == 1);
1306 }
1307
1308 static bool test_event_context_threaded(struct torture_context *test,
1309                                         const void *test_data)
1310 {
1311         struct tevent_context *ev;
1312         struct tevent_timer *te;
1313         struct tevent_fd *fde;
1314         pthread_t poll_thread;
1315         int fds[2];
1316         int ret;
1317         char c = 0;
1318
1319         ev = test_tevent_context_init_byname(test, "poll_mt");
1320         torture_assert(test, ev != NULL, "poll_mt not supported");
1321
1322         tevent_set_trace_callback(ev, test_event_threaded_trace, NULL);
1323
1324         te = tevent_add_timer(ev, ev, timeval_current_ofs(5, 0),
1325                               test_event_threaded_timer, NULL);
1326         torture_assert(test, te != NULL, "Could not add timer");
1327
1328         ret = pthread_create(&poll_thread, NULL, test_event_poll_thread, ev);
1329         torture_assert(test, ret == 0, "Could not create poll thread");
1330
1331         ret = pipe(fds);
1332         torture_assert(test, ret == 0, "Could not create pipe");
1333
1334         poll(NULL, 0, 100);
1335
1336         test_event_threaded_lock();
1337
1338         fde = tevent_add_fd(ev, ev, fds[0], TEVENT_FD_READ,
1339                             test_event_threaded_read_handler, &fds[0]);
1340         torture_assert(test, fde != NULL, "Could not add fd event");
1341
1342         test_event_threaded_unlock();
1343
1344         poll(NULL, 0, 100);
1345
1346         do_write(fds[1], &c, 1);
1347
1348         poll(NULL, 0, 100);
1349
1350         test_event_threaded_lock();
1351         do_shutdown = true;
1352         test_event_threaded_unlock();
1353
1354         do_write(fds[1], &c, 1);
1355
1356         ret = pthread_join(poll_thread, NULL);
1357         torture_assert(test, ret == 0, "pthread_join failed");
1358
1359         return true;
1360 }
1361
1362 #define NUM_TEVENT_THREADS 100
1363
1364 /* Ugly, but needed for torture_comment... */
1365 static struct torture_context *thread_test_ctx;
1366 static pthread_t thread_map[NUM_TEVENT_THREADS];
1367 static unsigned thread_counter;
1368
1369 /* Called in master thread context */
1370 static void callback_nowait(struct tevent_context *ev,
1371                                 struct tevent_immediate *im,
1372                                 void *private_ptr)
1373 {
1374         pthread_t *thread_id_ptr =
1375                 talloc_get_type_abort(private_ptr, pthread_t);
1376         unsigned i;
1377
1378         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1379                 if (pthread_equal(*thread_id_ptr,
1380                                 thread_map[i])) {
1381                         break;
1382                 }
1383         }
1384         torture_comment(thread_test_ctx,
1385                         "Callback %u from thread %u\n",
1386                         thread_counter,
1387                         i);
1388         thread_counter++;
1389 }
1390
1391 /* Blast the master tevent_context with a callback, no waiting. */
1392 static void *thread_fn_nowait(void *private_ptr)
1393 {
1394         struct tevent_thread_proxy *master_tp =
1395                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1396         struct tevent_immediate *im;
1397         pthread_t *thread_id_ptr;
1398
1399         im = tevent_create_immediate(NULL);
1400         if (im == NULL) {
1401                 return NULL;
1402         }
1403         thread_id_ptr = talloc(NULL, pthread_t);
1404         if (thread_id_ptr == NULL) {
1405                 return NULL;
1406         }
1407         *thread_id_ptr = pthread_self();
1408
1409         tevent_thread_proxy_schedule(master_tp,
1410                                 &im,
1411                                 callback_nowait,
1412                                 &thread_id_ptr);
1413         return NULL;
1414 }
1415
1416 static void timeout_fn(struct tevent_context *ev,
1417                         struct tevent_timer *te,
1418                         struct timeval tv, void *p)
1419 {
1420         thread_counter = NUM_TEVENT_THREADS * 10;
1421 }
1422
1423 static bool test_multi_tevent_threaded(struct torture_context *test,
1424                                         const void *test_data)
1425 {
1426         unsigned i;
1427         struct tevent_context *master_ev;
1428         struct tevent_thread_proxy *tp;
1429
1430         talloc_disable_null_tracking();
1431
1432         /* Ugly global stuff. */
1433         thread_test_ctx = test;
1434         thread_counter = 0;
1435
1436         master_ev = test_tevent_context_init(NULL);
1437         if (master_ev == NULL) {
1438                 return false;
1439         }
1440
1441         tp = tevent_thread_proxy_create(master_ev);
1442         if (tp == NULL) {
1443                 torture_fail(test,
1444                         talloc_asprintf(test,
1445                                 "tevent_thread_proxy_create failed\n"));
1446                 talloc_free(master_ev);
1447                 return false;
1448         }
1449
1450         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1451                 int ret = pthread_create(&thread_map[i],
1452                                 NULL,
1453                                 thread_fn_nowait,
1454                                 tp);
1455                 if (ret != 0) {
1456                         torture_fail(test,
1457                                 talloc_asprintf(test,
1458                                         "Failed to create thread %i, %d\n",
1459                                         i, ret));
1460                         return false;
1461                 }
1462         }
1463
1464         /* Ensure we don't wait more than 10 seconds. */
1465         tevent_add_timer(master_ev,
1466                         master_ev,
1467                         timeval_current_ofs(10,0),
1468                         timeout_fn,
1469                         NULL);
1470
1471         while (thread_counter < NUM_TEVENT_THREADS) {
1472                 int ret = tevent_loop_once(master_ev);
1473                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1474         }
1475
1476         torture_assert(test, thread_counter == NUM_TEVENT_THREADS,
1477                 "thread_counter fail\n");
1478
1479         talloc_free(master_ev);
1480         return true;
1481 }
1482
1483 struct reply_state {
1484         struct tevent_thread_proxy *reply_tp;
1485         pthread_t thread_id;
1486         int *p_finished;
1487 };
1488
1489 static void thread_timeout_fn(struct tevent_context *ev,
1490                         struct tevent_timer *te,
1491                         struct timeval tv, void *p)
1492 {
1493         int *p_finished = (int *)p;
1494
1495         *p_finished = 2;
1496 }
1497
1498 /* Called in child-thread context */
1499 static void thread_callback(struct tevent_context *ev,
1500                                 struct tevent_immediate *im,
1501                                 void *private_ptr)
1502 {
1503         struct reply_state *rsp =
1504                 talloc_get_type_abort(private_ptr, struct reply_state);
1505
1506         talloc_steal(ev, rsp);
1507         *rsp->p_finished = 1;
1508 }
1509
1510 /* Called in master thread context */
1511 static void master_callback(struct tevent_context *ev,
1512                                 struct tevent_immediate *im,
1513                                 void *private_ptr)
1514 {
1515         struct reply_state *rsp =
1516                 talloc_get_type_abort(private_ptr, struct reply_state);
1517         unsigned i;
1518
1519         talloc_steal(ev, rsp);
1520
1521         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1522                 if (pthread_equal(rsp->thread_id,
1523                                 thread_map[i])) {
1524                         break;
1525                 }
1526         }
1527         torture_comment(thread_test_ctx,
1528                         "Callback %u from thread %u\n",
1529                         thread_counter,
1530                         i);
1531         /* Now reply to the thread ! */
1532         tevent_thread_proxy_schedule(rsp->reply_tp,
1533                                 &im,
1534                                 thread_callback,
1535                                 &rsp);
1536
1537         thread_counter++;
1538 }
1539
1540 static void *thread_fn_1(void *private_ptr)
1541 {
1542         struct tevent_thread_proxy *master_tp =
1543                 talloc_get_type_abort(private_ptr, struct tevent_thread_proxy);
1544         struct tevent_thread_proxy *tp;
1545         struct tevent_immediate *im;
1546         struct tevent_context *ev;
1547         struct reply_state *rsp;
1548         int finished = 0;
1549         int ret;
1550
1551         ev = tevent_context_init(NULL);
1552         if (ev == NULL) {
1553                 return NULL;
1554         }
1555
1556         tp = tevent_thread_proxy_create(ev);
1557         if (tp == NULL) {
1558                 talloc_free(ev);
1559                 return NULL;
1560         }
1561
1562         im = tevent_create_immediate(ev);
1563         if (im == NULL) {
1564                 talloc_free(ev);
1565                 return NULL;
1566         }
1567
1568         rsp = talloc(ev, struct reply_state);
1569         if (rsp == NULL) {
1570                 talloc_free(ev);
1571                 return NULL;
1572         }
1573
1574         rsp->thread_id = pthread_self();
1575         rsp->reply_tp = tp;
1576         rsp->p_finished = &finished;
1577
1578         /* Introduce a little randomness into the mix.. */
1579         usleep(random() % 7000);
1580
1581         tevent_thread_proxy_schedule(master_tp,
1582                                 &im,
1583                                 master_callback,
1584                                 &rsp);
1585
1586         /* Ensure we don't wait more than 10 seconds. */
1587         tevent_add_timer(ev,
1588                         ev,
1589                         timeval_current_ofs(10,0),
1590                         thread_timeout_fn,
1591                         &finished);
1592
1593         while (finished == 0) {
1594                 ret = tevent_loop_once(ev);
1595                 assert(ret == 0);
1596         }
1597
1598         if (finished > 1) {
1599                 /* Timeout ! */
1600                 abort();
1601         }
1602
1603         /*
1604          * NB. We should talloc_free(ev) here, but if we do
1605          * we currently get hit by helgrind Fix #323432
1606          * "When calling pthread_cond_destroy or pthread_mutex_destroy
1607          * with initializers as argument Helgrind (incorrectly) reports errors."
1608          *
1609          * http://valgrind.10908.n7.nabble.com/Helgrind-3-9-0-false-positive-
1610          * with-pthread-mutex-destroy-td47757.html
1611          *
1612          * Helgrind doesn't understand that the request/reply
1613          * messages provide synchronization between the lock/unlock
1614          * in tevent_thread_proxy_schedule(), and the pthread_destroy()
1615          * when the struct tevent_thread_proxy object is talloc_free'd.
1616          *
1617          * As a work-around for now return ev for the parent thread to free.
1618          */
1619         return ev;
1620 }
1621
1622 static bool test_multi_tevent_threaded_1(struct torture_context *test,
1623                                         const void *test_data)
1624 {
1625         unsigned i;
1626         struct tevent_context *master_ev;
1627         struct tevent_thread_proxy *master_tp;
1628         int ret;
1629
1630         talloc_disable_null_tracking();
1631
1632         /* Ugly global stuff. */
1633         thread_test_ctx = test;
1634         thread_counter = 0;
1635
1636         master_ev = test_tevent_context_init(NULL);
1637         if (master_ev == NULL) {
1638                 return false;
1639         }
1640
1641         master_tp = tevent_thread_proxy_create(master_ev);
1642         if (master_tp == NULL) {
1643                 torture_fail(test,
1644                         talloc_asprintf(test,
1645                                 "tevent_thread_proxy_create failed\n"));
1646                 talloc_free(master_ev);
1647                 return false;
1648         }
1649
1650         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1651                 ret = pthread_create(&thread_map[i],
1652                                 NULL,
1653                                 thread_fn_1,
1654                                 master_tp);
1655                 if (ret != 0) {
1656                         torture_fail(test,
1657                                 talloc_asprintf(test,
1658                                         "Failed to create thread %i, %d\n",
1659                                         i, ret));
1660                                 return false;
1661                 }
1662         }
1663
1664         while (thread_counter < NUM_TEVENT_THREADS) {
1665                 ret = tevent_loop_once(master_ev);
1666                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1667         }
1668
1669         /* Wait for all the threads to finish - join 'em. */
1670         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1671                 void *retval;
1672                 ret = pthread_join(thread_map[i], &retval);
1673                 torture_assert(test, ret == 0, "pthread_join failed");
1674                 /* Free the child thread event context. */
1675                 talloc_free(retval);
1676         }
1677
1678         talloc_free(master_ev);
1679         return true;
1680 }
1681
1682 struct threaded_test_2 {
1683         struct tevent_threaded_context *tctx;
1684         struct tevent_immediate *im;
1685         pthread_t thread_id;
1686 };
1687
1688 static void master_callback_2(struct tevent_context *ev,
1689                               struct tevent_immediate *im,
1690                               void *private_data);
1691
1692 static void *thread_fn_2(void *private_data)
1693 {
1694         struct threaded_test_2 *state = private_data;
1695
1696         state->thread_id = pthread_self();
1697
1698         usleep(random() % 7000);
1699
1700         tevent_threaded_schedule_immediate(
1701                 state->tctx, state->im, master_callback_2, state);
1702
1703         return NULL;
1704 }
1705
1706 static void master_callback_2(struct tevent_context *ev,
1707                               struct tevent_immediate *im,
1708                               void *private_data)
1709 {
1710         struct threaded_test_2 *state = private_data;
1711         int i;
1712
1713         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1714                 if (pthread_equal(state->thread_id, thread_map[i])) {
1715                         break;
1716                 }
1717         }
1718         torture_comment(thread_test_ctx,
1719                         "Callback_2 %u from thread %u\n",
1720                         thread_counter,
1721                         i);
1722         thread_counter++;
1723 }
1724
1725 static bool test_multi_tevent_threaded_2(struct torture_context *test,
1726                                          const void *test_data)
1727 {
1728         unsigned i;
1729
1730         struct tevent_context *ev;
1731         struct tevent_threaded_context *tctx;
1732         int ret;
1733
1734         thread_test_ctx = test;
1735         thread_counter = 0;
1736
1737         ev = test_tevent_context_init(test);
1738         torture_assert(test, ev != NULL, "tevent_context_init failed");
1739
1740         /*
1741          * tevent_re_initialise used to have a bug where it did not
1742          * re-initialise the thread support after taking it
1743          * down. Exercise that code path.
1744          */
1745         ret = tevent_re_initialise(ev);
1746         torture_assert(test, ret == 0, "tevent_re_initialise failed");
1747
1748         tctx = tevent_threaded_context_create(ev, ev);
1749         torture_assert(test, tctx != NULL,
1750                        "tevent_threaded_context_create failed");
1751
1752         for (i=0; i<NUM_TEVENT_THREADS; i++) {
1753                 struct threaded_test_2 *state;
1754
1755                 state = talloc(ev, struct threaded_test_2);
1756                 torture_assert(test, state != NULL, "talloc failed");
1757
1758                 state->tctx = tctx;
1759                 state->im = tevent_create_immediate(state);
1760                 torture_assert(test, state->im != NULL,
1761                                "tevent_create_immediate failed");
1762
1763                 ret = pthread_create(&thread_map[i], NULL, thread_fn_2, state);
1764                 torture_assert(test, ret == 0, "pthread_create failed");
1765         }
1766
1767         while (thread_counter < NUM_TEVENT_THREADS) {
1768                 ret = tevent_loop_once(ev);
1769                 torture_assert(test, ret == 0, "tevent_loop_once failed");
1770         }
1771
1772         /* Wait for all the threads to finish - join 'em. */
1773         for (i = 0; i < NUM_TEVENT_THREADS; i++) {
1774                 void *retval;
1775                 ret = pthread_join(thread_map[i], &retval);
1776                 torture_assert(test, ret == 0, "pthread_join failed");
1777                 /* Free the child thread event context. */
1778         }
1779
1780         talloc_free(tctx);
1781         talloc_free(ev);
1782         return true;
1783 }
1784
1785 struct test_cached_pid_thread_state {
1786         pid_t thread_cached_pid;
1787         pid_t thread_pid;
1788 };
1789
1790 static void *test_cached_pid_thread(void *private_data)
1791 {
1792         struct test_cached_pid_thread_state *state =
1793                 (struct test_cached_pid_thread_state *)private_data;
1794
1795         state->thread_cached_pid = tevent_cached_getpid();
1796         state->thread_pid = getpid();
1797
1798         return NULL;
1799 }
1800 #endif
1801
1802 static bool test_cached_pid(struct torture_context *test,
1803                             const void *test_data)
1804 {
1805         pid_t parent_pid = getpid();
1806         pid_t child_pid;
1807         pid_t finished_pid;
1808         int child_status;
1809
1810         torture_assert(test, tevent_cached_getpid() == parent_pid, "tevent_cached_getpid()");
1811
1812 #ifdef HAVE_PTHREAD
1813         {
1814                 struct test_cached_pid_thread_state state = { .thread_cached_pid = -1, };
1815                 pthread_t thread;
1816                 void *retval = NULL;
1817                 int ret;
1818
1819                 ret = pthread_create(&thread, NULL, test_cached_pid_thread, &state);
1820                 torture_assert(test, ret == 0, "pthread_create failed");
1821
1822                 ret = pthread_join(thread, &retval);
1823                 torture_assert(test, ret == 0, "pthread_join failed");
1824
1825                 torture_assert(test, state.thread_pid == parent_pid, "getpid() in thread");
1826                 torture_assert(test, state.thread_cached_pid == parent_pid, "tevent_cached_getpid() in thread");
1827         }
1828 #endif /* HAVE_PTHREAD */
1829
1830         child_pid = fork();
1831         if (child_pid == 0) {
1832                 /* child */
1833                 pid_t pid = getpid();
1834                 pid_t cached_pid = tevent_cached_getpid();
1835
1836                 if (parent_pid == pid) {
1837                         exit(1);
1838                 }
1839                 if (pid != cached_pid) {
1840                         exit(2);
1841                 }
1842                 exit(0);
1843         }
1844         torture_assert(test, child_pid > 0, "fork failed");
1845
1846         finished_pid = waitpid(child_pid, &child_status, 0);
1847         torture_assert(test, finished_pid == child_pid, "wrong child");
1848         torture_assert(test, child_status == 0, "child_status");
1849
1850         return true;
1851 }
1852
1853 struct torture_suite *torture_local_event(TALLOC_CTX *mem_ctx)
1854 {
1855         struct torture_suite *suite = torture_suite_create(mem_ctx, "event");
1856         const char **list = tevent_backend_list(suite);
1857         int i;
1858
1859         for (i=0;list && list[i];i++) {
1860                 struct torture_suite *backend_suite;
1861
1862                 backend_suite = torture_suite_create(mem_ctx, list[i]);
1863
1864                 torture_suite_add_simple_tcase_const(backend_suite,
1865                                                "context",
1866                                                test_event_context,
1867                                                (const void *)list[i]);
1868                 torture_suite_add_simple_tcase_const(backend_suite,
1869                                                "fd1",
1870                                                test_event_fd1,
1871                                                (const void *)list[i]);
1872                 torture_suite_add_simple_tcase_const(backend_suite,
1873                                                "fd2",
1874                                                test_event_fd2,
1875                                                (const void *)list[i]);
1876                 torture_suite_add_simple_tcase_const(backend_suite,
1877                                                "wrapper",
1878                                                test_wrapper,
1879                                                (const void *)list[i]);
1880                 torture_suite_add_simple_tcase_const(backend_suite,
1881                                                "free_wrapper",
1882                                                test_free_wrapper,
1883                                                (const void *)list[i]);
1884
1885                 torture_suite_add_suite(suite, backend_suite);
1886         }
1887
1888 #ifdef HAVE_PTHREAD
1889         torture_suite_add_simple_tcase_const(suite, "threaded_poll_mt",
1890                                              test_event_context_threaded,
1891                                              NULL);
1892
1893         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded",
1894                                              test_multi_tevent_threaded,
1895                                              NULL);
1896
1897         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_1",
1898                                              test_multi_tevent_threaded_1,
1899                                              NULL);
1900
1901         torture_suite_add_simple_tcase_const(suite, "multi_tevent_threaded_2",
1902                                              test_multi_tevent_threaded_2,
1903                                              NULL);
1904
1905 #endif
1906
1907         torture_suite_add_simple_tcase_const(suite, "tevent_cached_getpid",
1908                                              test_cached_pid,
1909                                              NULL);
1910
1911         return suite;
1912 }