lib/tevent: Add a thread-safe tevent backend
authorVolker Lendecke <vl@samba.org>
Mon, 13 Aug 2012 14:06:01 +0000 (16:06 +0200)
committerStefan Metzmacher <metze@samba.org>
Thu, 16 Aug 2012 18:49:11 +0000 (20:49 +0200)
Signed-off-by: Stefan Metzmacher <metze@samba.org>
lib/tevent/tevent.c
lib/tevent/tevent_internal.h
lib/tevent/tevent_poll.c

index 61ffc7edaa5683e03f4d57c20eb6d5d58e077b30..fa842e42086bc88d033f71df41ea9c6c991548b6 100644 (file)
@@ -114,6 +114,7 @@ static void tevent_backend_init(void)
 {
        tevent_select_init();
        tevent_poll_init();
+       tevent_poll_mt_init();
        tevent_standard_init();
 #ifdef HAVE_EPOLL
        tevent_epoll_init();
index 877510f9f43b67b11d925b9efb25991b3fbad6d2..f09cf576b2796e9bb0e7982a5fcdec903f9bdebf 100644 (file)
@@ -315,6 +315,7 @@ void tevent_cleanup_pending_signal_handlers(struct tevent_signal *se);
 bool tevent_standard_init(void);
 bool tevent_select_init(void);
 bool tevent_poll_init(void);
+bool tevent_poll_mt_init(void);
 #ifdef HAVE_EPOLL
 bool tevent_epoll_init(void);
 #endif
index 2639143998926978cc3b9015adb5184c6cfd4f68..da8cc0137acf0dd158acdb147d90af8d9967d09f 100644 (file)
@@ -34,7 +34,8 @@ struct poll_event_context {
        struct tevent_context *ev;
 
        /*
-        * A DLIST for fresh fde's
+        * A DLIST for fresh fde's added by poll_event_add_fd but not
+        * picked up yet by poll_event_loop_once
         */
        struct tevent_fd *fresh;
 
@@ -45,6 +46,11 @@ struct poll_event_context {
        struct tevent_fd **fdes;
        unsigned num_fds;
 
+       /*
+        * Signal fd to wake the poll() thread
+        */
+       int signal_fd;
+
        /* information for exiting from the event loop */
        int exit_code;
 };
@@ -61,17 +67,125 @@ static int poll_event_context_init(struct tevent_context *ev)
                return -1;
        }
        poll_ev->ev = ev;
+       poll_ev->signal_fd = -1;
        ev->additional_data = poll_ev;
        return 0;
 }
 
+static int poll_event_mt_destructor(struct poll_event_context *poll_ev)
+{
+       if (poll_ev->signal_fd != -1) {
+               close(poll_ev->signal_fd);
+               poll_ev->signal_fd = -1;
+       }
+       if (poll_ev->num_fds == 0) {
+               return 0;
+       }
+       if (poll_ev->fds[0].fd != -1) {
+               close(poll_ev->fds[0].fd);
+               poll_ev->fds[0].fd = -1;
+       }
+       return 0;
+}
+
+static bool set_nonblock(int fd)
+{
+       int val;
+
+       val = fcntl(fd, F_GETFL, 0);
+       if (val == -1) {
+               return false;
+       }
+       val |= O_NONBLOCK;
+
+       return (fcntl(fd, F_SETFL, val) != -1);
+}
+
+static int poll_event_context_init_mt(struct tevent_context *ev)
+{
+       struct poll_event_context *poll_ev;
+       struct pollfd *pfd;
+       int fds[2];
+       int ret;
+
+       ret = poll_event_context_init(ev);
+       if (ret == -1) {
+               return ret;
+       }
+
+       poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       poll_ev->fds = talloc_zero(poll_ev, struct pollfd);
+       if (poll_ev->fds == NULL) {
+               return -1;
+       }
+
+       ret = pipe(fds);
+       if (ret == -1) {
+               return -1;
+       }
+
+       if (!set_nonblock(fds[0]) || !set_nonblock(fds[1])) {
+               close(fds[0]);
+               close(fds[1]);
+               return -1;
+       }
+
+       poll_ev->signal_fd = fds[1];
+
+       pfd = &poll_ev->fds[0];
+       pfd->fd = fds[0];
+       pfd->events = (POLLIN|POLLHUP);
+
+       poll_ev->num_fds = 1;
+
+       talloc_set_destructor(poll_ev, poll_event_mt_destructor);
+
+       return 0;
+}
+
+static void poll_event_wake_pollthread(struct poll_event_context *poll_ev)
+{
+       char c;
+       ssize_t ret;
+
+       if (poll_ev->signal_fd == -1) {
+               return;
+       }
+       c = 0;
+       do {
+               ret = write(poll_ev->signal_fd, &c, sizeof(c));
+       } while ((ret == -1) && (errno == EINTR));
+}
+
+static void poll_event_drain_signal_fd(struct poll_event_context *poll_ev)
+{
+       char buf[16];
+       ssize_t ret;
+       int fd;
+
+       if (poll_ev->signal_fd == -1) {
+               return;
+       }
+
+       if (poll_ev->num_fds < 1) {
+               return;
+       }
+       fd = poll_ev->fds[0].fd;
+
+       do {
+               ret = read(fd, buf, sizeof(buf));
+       } while (ret == sizeof(buf));
+}
+
 /*
   destroy an fd_event
 */
 static int poll_event_fd_destructor(struct tevent_fd *fde)
 {
        struct tevent_context *ev = fde->event_ctx;
-       struct poll_event_context *poll_ev = NULL;
+       struct poll_event_context *poll_ev;
        uint64_t del_idx = fde->additional_flags;
 
        if (ev == NULL) {
@@ -82,6 +196,7 @@ static int poll_event_fd_destructor(struct tevent_fd *fde)
                ev->additional_data, struct poll_event_context);
 
        poll_ev->fdes[del_idx] = NULL;
+       poll_event_wake_pollthread(poll_ev);
 done:
        return tevent_common_fd_destructor(fde);
 }
@@ -94,6 +209,21 @@ static int poll_fresh_fde_destructor(struct tevent_fd *fde)
        return 0;
 }
 
+static void poll_event_schedule_immediate(struct tevent_immediate *im,
+                                         struct tevent_context *ev,
+                                         tevent_immediate_handler_t handler,
+                                         void *private_data,
+                                         const char *handler_name,
+                                         const char *location)
+{
+       struct poll_event_context *poll_ev = talloc_get_type_abort(
+               ev->additional_data, struct poll_event_context);
+
+       tevent_common_schedule_immediate(im, ev, handler, private_data,
+                                        handler_name, location);
+       poll_event_wake_pollthread(poll_ev);
+}
+
 /*
   add a fd based event
   return NULL on failure (memory allocation error)
@@ -131,6 +261,7 @@ static struct tevent_fd *poll_event_add_fd(struct tevent_context *ev,
 
        DLIST_ADD(poll_ev->fresh, fde);
        talloc_set_destructor(fde, poll_fresh_fde_destructor);
+       poll_event_wake_pollthread(poll_ev);
 
        /*
         * poll_event_loop_poll will take care of the rest in
@@ -159,6 +290,7 @@ static void poll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        poll_ev->fds[idx].events = pollflags;
 
        fde->flags = flags;
+       poll_event_wake_pollthread(poll_ev);
 }
 
 static bool poll_event_setup_fresh(struct tevent_context *ev,
@@ -246,6 +378,7 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                ev->additional_data, struct poll_event_context);
        int pollrtn;
        int timeout = -1;
+       unsigned first_fd;
        unsigned i;
 
        if (ev->signal_events && tevent_common_check_signal(ev)) {
@@ -257,6 +390,8 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                timeout += (tvalp->tv_usec + 999) / 1000;
        }
 
+       poll_event_drain_signal_fd(poll_ev);
+
        if (!poll_event_setup_fresh(ev, poll_ev)) {
                return -1;
        }
@@ -283,11 +418,13 @@ static int poll_event_loop_poll(struct tevent_context *ev,
                return 0;
        }
 
+       first_fd = (poll_ev->signal_fd != -1) ? 1 : 0;
+
        /* at least one file descriptor is ready - check
           which ones and call the handler, being careful to allow
           the handler to remove itself when called */
 
-       for (i=0; i<poll_ev->num_fds; i++) {
+       for (i=first_fd; i<poll_ev->num_fds; i++) {
                struct pollfd *pfd;
                struct tevent_fd *fde;
                uint16_t flags = 0;
@@ -379,3 +516,21 @@ _PRIVATE_ bool tevent_poll_init(void)
 {
        return tevent_register_backend("poll", &poll_event_ops);
 }
+
+static const struct tevent_ops poll_event_mt_ops = {
+       .context_init           = poll_event_context_init_mt,
+       .add_fd                 = poll_event_add_fd,
+       .set_fd_close_fn        = tevent_common_fd_set_close_fn,
+       .get_fd_flags           = tevent_common_fd_get_flags,
+       .set_fd_flags           = poll_event_set_fd_flags,
+       .add_timer              = tevent_common_add_timer,
+       .schedule_immediate     = poll_event_schedule_immediate,
+       .add_signal             = tevent_common_add_signal,
+       .loop_once              = poll_event_loop_once,
+       .loop_wait              = tevent_common_loop_wait,
+};
+
+_PRIVATE_ bool tevent_poll_mt_init(void)
+{
+       return tevent_register_backend("poll_mt", &poll_event_mt_ops);
+}