tevent: let tevent_epoll.c use new generic mpx infrastructure
authorStefan Metzmacher <metze@samba.org>
Fri, 11 Nov 2022 21:30:35 +0000 (22:30 +0100)
committerRalph Boehme <slow@samba.org>
Fri, 13 Oct 2023 09:49:33 +0000 (09:49 +0000)
This allows any number of event handlers per low level fd.

It means the epoll backend behaves like the poll backend now.

Signed-off-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Ralph Boehme <slow@samba.org>
lib/tevent/tevent_epoll.c

index a1191eb57a136e5d0fe4f5ab2e7456dbd02a7d85..43d5e37d1e2cb5fc82c02c46e4314aaae5a9ce71 100644 (file)
@@ -47,9 +47,7 @@ struct epoll_event_context {
 };
 
 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT     (1<<0)
-#define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR  (1<<1)
-#define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR     (1<<2)
-#define EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX       (1<<3)
+#define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR     (1<<1)
 
 #ifdef TEST_PANIC_FALLBACK
 
@@ -236,7 +234,12 @@ static void epoll_check_reopen(struct epoll_event_context *epoll_ev)
        epoll_ev->pid = pid;
        epoll_ev->panic_state = &panic_triggered;
        for (fde=epoll_ev->ev->fd_events;fde;fde=fde->next) {
-               tevent_common_fd_mpx_reinit(fde);
+               /*
+                * We leave the mpx mappings alive
+                * so that we'll just re-add events for
+                * the existing primary events in the loop
+                * below.
+                */
                fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
        }
        for (fde=epoll_ev->ev->fd_events;fde;fde=fde->next) {
@@ -255,7 +258,7 @@ static void epoll_check_reopen(struct epoll_event_context *epoll_ev)
 /*
  epoll cannot add the same file descriptor twice, once
  with read, once with write which is allowed by the
- tevent backend. Multiplex the existing fde, flag it
+ tevent poll backend. Multiplex the existing fde, flag it
  as such so we can search for the correct fde on
  event triggering.
 */
@@ -263,82 +266,88 @@ static void epoll_check_reopen(struct epoll_event_context *epoll_ev)
 static int epoll_add_multiplex_fd(struct epoll_event_context *epoll_ev,
                                  struct tevent_fd *add_fde)
 {
+       struct tevent_fd *primary = NULL;
+       uint16_t effective_flags;
        struct epoll_event event;
-       struct tevent_fd *mpx_fde;
+       uint64_t clear_flags = 0;
+       uint64_t add_flags = 0;
        int ret;
 
-       /* Find the existing fde that caused the EEXIST error. */
-       for (mpx_fde = epoll_ev->ev->fd_events; mpx_fde; mpx_fde = mpx_fde->next) {
-               if (mpx_fde->fd != add_fde->fd) {
-                       continue;
-               }
-
-               if (mpx_fde == add_fde) {
-                       continue;
-               }
-
-               break;
-       }
-       if (mpx_fde == NULL) {
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_FATAL,
-                            "can't find multiplex fde for fd[%d]",
-                            add_fde->fd);
+       /*
+        * Check if there is another fde we can attach to
+        */
+       primary = tevent_common_fd_mpx_add(add_fde);
+       if (primary == NULL) {
+               /* the caller calls epoll_panic() */
                return -1;
        }
 
-       if (mpx_fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               /* Logic error. Can't have more than 2 multiplexed fde's. */
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_FATAL,
-                            "multiplex fde for fd[%d] is already multiplexed\n",
-                            mpx_fde->fd);
-               return -1;
+       /*
+        * First propagate the HAS_EVENT flag from
+        * the primary to all others (mainly add_fde)
+        */
+       if (primary->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
+               add_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+               tevent_common_fd_mpx_additional_flags(primary, 0, add_flags);
        }
 
        /*
-        * The multiplex fde must have the same fd, and also
-        * already have an epoll event attached.
+        * Update the mpx internals and check if
+        * there is an update needed.
         */
-       if (!(mpx_fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) {
-               /* Logic error. Can't have more than 2 multiplexed fde's. */
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_FATAL,
-                            "multiplex fde for fd[%d] has no event\n",
-                            mpx_fde->fd);
-               return -1;
+       primary = tevent_common_fd_mpx_update(primary);
+       if (primary == NULL) {
+               /*
+                * It seems the primary was already
+                * watching (at least) the same flags
+                * as add_fde, so we are done.
+                */
+               return 0;
        }
 
-       /* Modify the mpx_fde to add in the new flags. */
+       /*
+        * Before me modify the low level epoll state,
+        * we clear HAS_EVENT on all fdes.
+        */
+       clear_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, clear_flags, 0);
+
+       effective_flags = tevent_common_fd_mpx_flags(primary);
+
+       /*
+        * Modify the low level epoll state to reflect
+        * the effective flags we want to monitor.
+        */
        ZERO_STRUCT(event);
-       event.events = epoll_map_flags(mpx_fde->flags);
-       event.events |= epoll_map_flags(add_fde->flags);
-       event.data.ptr = mpx_fde;
-       ret = epoll_ctl(epoll_ev->epoll_fd, EPOLL_CTL_MOD, mpx_fde->fd, &event);
+       event.events = epoll_map_flags(effective_flags);
+       event.data.ptr = primary;
+       ret = epoll_ctl(epoll_ev->epoll_fd,
+                       EPOLL_CTL_MOD,
+                       primary->fd,
+                       &event);
        if (ret != 0 && errno == EBADF) {
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_ERROR,
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_ERROR,
                             "EPOLL_CTL_MOD EBADF for "
-                            "add_fde[%p] mpx_fde[%p] fd[%d] - disabling\n",
-                            add_fde, mpx_fde, add_fde->fd);
-               tevent_common_fd_disarm(mpx_fde);
-               tevent_common_fd_disarm(add_fde);
+                            "%s - disabling\n",
+                            tevent_common_fd_str(&pbuf, "primary", primary));
+               tevent_common_fd_mpx_disarm_all(primary);
                return 0;
        } else if (ret != 0) {
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_FATAL,
+                            "EPOLL_CTL_MOD for %s - failed - %s",
+                            tevent_common_fd_str(&pbuf, "primary", primary),
+                            strerror(errno));
+               /* the caller calls epoll_panic() */
                return ret;
        }
 
        /*
-        * Make each fde->additional_data pointers point at each other
-        * so we can look them up from each other. They are now paired.
+        * Finally re-add HAS_EVENT to all fdes
         */
-       mpx_fde->additional_data = (struct tevent_fd *)add_fde;
-       add_fde->additional_data = (struct tevent_fd *)mpx_fde;
-
-       /* Now flag both fde's as being multiplexed. */
-       mpx_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX;
-       add_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX;
-
-       /* we need to keep the GOT_ERROR flag */
-       if (mpx_fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR) {
-               add_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
-       }
+       add_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, 0, add_flags);
 
        return 0;
 }
@@ -346,118 +355,121 @@ static int epoll_add_multiplex_fd(struct epoll_event_context *epoll_ev,
 /*
  add the epoll event to the given fd_event
 */
-static void epoll_add_event(struct epoll_event_context *epoll_ev, struct tevent_fd *fde)
+static void epoll_add_event(struct epoll_event_context *epoll_ev,
+                           struct tevent_fd *_primary)
 {
+       struct tevent_fd *primary = tevent_common_fd_mpx_primary(_primary);
+       uint16_t effective_flags = tevent_common_fd_mpx_flags(primary);
        struct epoll_event event;
+       uint64_t clear_flags = 0;
+       uint64_t add_flags = 0;
        int ret;
-       struct tevent_fd *mpx_fde = NULL;
-
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               /*
-                * This is a multiplexed fde, we need to include both
-                * flags in the modified event.
-                */
-               mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                               struct tevent_fd);
 
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
+       /*
+        * Before me modify the low level epoll state,
+        * we clear HAS_EVENT on all fdes.
+        */
+       clear_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, clear_flags, 0);
 
+       /*
+        * Modify the low level epoll state to reflect
+        * the effective flags we want to monitor.
+        *
+        * Most likely we won't trigger the EEXIST
+        * case, so it's much cheaper to try and
+        * react on EEXIST if needed, than to always
+        * scan the list of all existing events.
+        */
        ZERO_STRUCT(event);
-       event.events = epoll_map_flags(fde->flags);
-       if (mpx_fde != NULL) {
-               event.events |= epoll_map_flags(mpx_fde->flags);
-       }
-       event.data.ptr = fde;
-       ret = epoll_ctl(epoll_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
+       event.events = epoll_map_flags(effective_flags);
+       event.data.ptr = primary;
+       ret = epoll_ctl(epoll_ev->epoll_fd,
+                       EPOLL_CTL_ADD,
+                       primary->fd,
+                       &event);
        if (ret != 0 && errno == EBADF) {
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_ERROR,
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_ERROR,
                             "EPOLL_CTL_ADD EBADF for "
-                            "fde[%p] mpx_fde[%p] fd[%d] - disabling\n",
-                            fde, mpx_fde, fde->fd);
-               tevent_common_fd_disarm(fde);
-               if (mpx_fde != NULL) {
-                       tevent_common_fd_disarm(mpx_fde);
-               }
+                            "%s - disabling\n",
+                            tevent_common_fd_str(&pbuf, "primary", primary));
+               tevent_common_fd_mpx_disarm_all(primary);
                return;
-       } else if (ret != 0 && errno == EEXIST && mpx_fde == NULL) {
-               ret = epoll_add_multiplex_fd(epoll_ev, fde);
+       } else if (ret != 0 && errno == EEXIST) {
+               ret = epoll_add_multiplex_fd(epoll_ev, primary);
                if (ret != 0) {
                        epoll_panic(epoll_ev, "epoll_add_multiplex_fd failed",
                                    false);
                        return;
                }
+               /*
+                * epoll_add_multiplex_fd() already
+                * added EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT
+                */
+               return;
        } else if (ret != 0) {
                epoll_panic(epoll_ev, "EPOLL_CTL_ADD failed", false);
                return;
        }
 
-       fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       /* only if we want to read we want to tell the event handler about errors */
-       if (fde->flags & TEVENT_FD_READ) {
-               fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
-
-       if (mpx_fde == NULL) {
-               return;
-       }
-
-       mpx_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       /* only if we want to read we want to tell the event handler about errors */
-       if (mpx_fde->flags & TEVENT_FD_READ) {
-               mpx_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
+       /*
+        * Finally re-add HAS_EVENT to all fdes
+        */
+       add_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, 0, add_flags);
 }
 
 /*
  delete the epoll event for given fd_event
 */
-static void epoll_del_event(struct epoll_event_context *epoll_ev, struct tevent_fd *fde)
+static void epoll_del_event(struct epoll_event_context *epoll_ev,
+                           struct tevent_fd *_primary)
 {
+       struct tevent_fd *primary = tevent_common_fd_mpx_primary(_primary);
        struct epoll_event event;
+       uint64_t clear_flags = 0;
        int ret;
-       struct tevent_fd *mpx_fde = NULL;
 
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               /*
-                * This is a multiplexed fde, we need to modify both events.
-                */
-               mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                               struct tevent_fd);
-
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
+       /*
+        * Before me delete the low level epoll state,
+        * we clear HAS_EVENT on all fdes.
+        */
+       clear_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, clear_flags, 0);
 
+       /*
+        * Delete the low level epoll state to reflect
+        * the effective flags we want to monitor.
+        */
        ZERO_STRUCT(event);
-       ret = epoll_ctl(epoll_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
+       ret = epoll_ctl(epoll_ev->epoll_fd,
+                       EPOLL_CTL_DEL,
+                       primary->fd,
+                       &event);
        if (ret != 0 && errno == ENOENT) {
+               struct tevent_common_fd_buf pbuf = {};
                /*
                 * This can happen after a epoll_check_reopen
                 * within epoll_event_fd_destructor.
                 */
                TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_TRACE,
-                            "EPOLL_CTL_DEL ignoring ENOENT for fd[%d]\n",
-                            fde->fd);
+                            "EPOLL_CTL_DEL ignoring ENOENT for %s\n",
+                            tevent_common_fd_str(&pbuf, "primary", primary));
                return;
        } else if (ret != 0 && errno == EBADF) {
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_WARNING,
-                            "EPOLL_CTL_DEL EBADF for "
-                            "fde[%p] mpx_fde[%p] fd[%d] - disabling\n",
-                            fde, mpx_fde, fde->fd);
-               tevent_common_fd_disarm(fde);
-               if (mpx_fde != NULL) {
-                       tevent_common_fd_disarm(mpx_fde);
-               }
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_WARNING,
+                            "EPOLL_CTL_DEL EBADF for %s - disabling\n",
+                            tevent_common_fd_str(&pbuf, "primary", primary));
+               tevent_common_fd_mpx_disarm_all(primary);
                return;
        } else if (ret != 0) {
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_FATAL,
+                            "EPOLL_CTL_DEL for %s - failed - %s",
+                            tevent_common_fd_str(&pbuf, "primary", primary),
+                            strerror(errno));
                epoll_panic(epoll_ev, "EPOLL_CTL_DEL failed", false);
                return;
        }
@@ -466,93 +478,71 @@ static void epoll_del_event(struct epoll_event_context *epoll_ev, struct tevent_
 /*
  change the epoll event to the given fd_event
 */
-static void epoll_mod_event(struct epoll_event_context *epoll_ev, struct tevent_fd *fde)
+static void epoll_mod_event(struct epoll_event_context *epoll_ev,
+                           struct tevent_fd *_primary)
 {
-       struct tevent_fd *mpx_fde = NULL;
+       struct tevent_fd *primary = tevent_common_fd_mpx_primary(_primary);
+       uint16_t effective_flags = tevent_common_fd_mpx_flags(primary);
        struct epoll_event event;
+       uint64_t clear_flags = 0;
+       uint64_t add_flags = 0;
        int ret;
 
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               /*
-                * This is a multiplexed fde, we need to include both
-                * flags in the modified event.
-                */
-               mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                               struct tevent_fd);
-
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
+       /*
+        * Before me modify the low level epoll state,
+        * we clear HAS_EVENT on all fdes.
+        */
+       clear_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, clear_flags, 0);
 
+       /*
+        * Modify the low level epoll state to reflect
+        * the effective flags we want to monitor.
+        */
        ZERO_STRUCT(event);
-       event.events = epoll_map_flags(fde->flags);
-       if (mpx_fde != NULL) {
-               event.events |= epoll_map_flags(mpx_fde->flags);
-       }
-       event.data.ptr = fde;
-       ret = epoll_ctl(epoll_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
+       event.events = epoll_map_flags(effective_flags);
+       event.data.ptr = primary;
+       ret = epoll_ctl(epoll_ev->epoll_fd,
+                       EPOLL_CTL_MOD,
+                       primary->fd,
+                       &event);
        if (ret != 0 && errno == EBADF) {
-               tevent_debug(epoll_ev->ev, TEVENT_DEBUG_ERROR,
-                            "EPOLL_CTL_MOD EBADF for "
-                            "fde[%p] mpx_fde[%p] fd[%d] - disabling\n",
-                            fde, mpx_fde, fde->fd);
-               tevent_common_fd_disarm(fde);
-               if (mpx_fde != NULL) {
-                       tevent_common_fd_disarm(mpx_fde);
-               }
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_ERROR,
+                            "EPOLL_CTL_MOD EBADF for %s - disabling\n",
+                            tevent_common_fd_str(&pbuf, "primary", primary));
+               tevent_common_fd_mpx_disarm_all(primary);
                return;
        } else if (ret != 0) {
+               struct tevent_common_fd_buf pbuf = {};
+               TEVENT_DEBUG(epoll_ev->ev, TEVENT_DEBUG_FATAL,
+                            "EPOLL_CTL_MOD for %s - failed - %s",
+                            tevent_common_fd_str(&pbuf, "primary", primary),
+                            strerror(errno));
                epoll_panic(epoll_ev, "EPOLL_CTL_MOD failed", false);
                return;
        }
 
-       fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       /* only if we want to read we want to tell the event handler about errors */
-       if (fde->flags & TEVENT_FD_READ) {
-               fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
-
-       if (mpx_fde == NULL) {
-               return;
-       }
-
-       mpx_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       /* only if we want to read we want to tell the event handler about errors */
-       if (mpx_fde->flags & TEVENT_FD_READ) {
-               mpx_fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
-       }
+       /*
+        * Finally re-add HAS_EVENT to all fdes
+        */
+       add_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+       tevent_common_fd_mpx_additional_flags(primary, 0, add_flags);
 }
 
 static void epoll_update_event(struct epoll_event_context *epoll_ev, struct tevent_fd *fde)
 {
-       bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
-       bool want_read = (fde->flags & TEVENT_FD_READ);
-       bool want_write= (fde->flags & TEVENT_FD_WRITE);
-       struct tevent_fd *mpx_fde = NULL;
-
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               /*
-                * work out what the multiplexed fde wants.
-                */
-               mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                               struct tevent_fd);
-
-               if (mpx_fde->flags & TEVENT_FD_READ) {
-                       want_read = true;
-               }
-
-               if (mpx_fde->flags & TEVENT_FD_WRITE) {
-                       want_write = true;
-               }
-       }
+       struct tevent_fd *primary = tevent_common_fd_mpx_primary(fde);
+       uint64_t _paf = primary->additional_flags;
+       bool got_error = (_paf & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
+       uint16_t effective_flags = tevent_common_fd_mpx_flags(primary);
+       bool want_read = (effective_flags & TEVENT_FD_READ);
+       bool want_write= (effective_flags & TEVENT_FD_WRITE);
 
        /* there's already an event */
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
+       if (primary->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
                if (want_read || (want_write && !got_error)) {
-                       epoll_mod_event(epoll_ev, fde);
+                       epoll_mod_event(epoll_ev, primary);
                        return;
                }
                /* 
@@ -561,49 +551,17 @@ static void epoll_update_event(struct epoll_event_context *epoll_ev, struct teve
                 *
                 * this is because epoll reports EPOLLERR and EPOLLHUP, even without asking for them
                 */
-               epoll_del_event(epoll_ev, fde);
+               epoll_del_event(epoll_ev, primary);
                return;
        }
 
        /* there's no epoll_event attached to the fde */
        if (want_read || (want_write && !got_error)) {
-               epoll_add_event(epoll_ev, fde);
+               epoll_add_event(epoll_ev, primary);
                return;
        }
 }
 
-/*
-  Cope with epoll returning EPOLLHUP|EPOLLERR on an event.
-  Return true if there's nothing else to do, false if
-  this event needs further handling.
-*/
-static bool epoll_handle_hup_or_err(struct epoll_event_context *epoll_ev,
-                               struct tevent_fd *fde)
-{
-       if (fde == NULL) {
-               /* Nothing to do if no event. */
-               return true;
-       }
-
-       fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
-       /*
-        * if we only wait for TEVENT_FD_WRITE, we should not tell the
-        * event handler about it, and remove the epoll_event,
-        * as we only report errors when waiting for read events,
-        * to match the select() behavior
-        */
-       if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
-               /*
-                * Do the same as the poll backend and
-                * remove the writeable flag.
-                */
-               fde->flags &= ~TEVENT_FD_WRITE;
-               return true;
-       }
-       /* This has TEVENT_FD_READ set, we're not finished. */
-       return false;
-}
-
 /*
   event loop handling using epoll
 */
@@ -650,70 +608,71 @@ static int epoll_event_loop(struct epoll_event_context *epoll_ev, struct timeval
        for (i=0;i<ret;i++) {
                struct tevent_fd *fde = talloc_get_type(events[i].data.ptr,
                                                       struct tevent_fd);
+               struct tevent_fd *selected = NULL;
+               uint16_t effective_flags;
                uint16_t flags = 0;
-               struct tevent_fd *mpx_fde = NULL;
+               bool got_error = false;
 
                if (fde == NULL) {
                        epoll_panic(epoll_ev, "epoll_wait() gave bad data", true);
                        return -1;
                }
-               if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-                       /*
-                        * Save off the multiplexed event in case we need
-                        * to use it to call the handler function.
-                        */
-                       mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                                       struct tevent_fd);
-               }
+               effective_flags = tevent_common_fd_mpx_flags(fde);
                if (events[i].events & (EPOLLHUP|EPOLLERR)) {
-                       bool handled_fde = epoll_handle_hup_or_err(epoll_ev, fde);
-                       bool handled_mpx = epoll_handle_hup_or_err(epoll_ev, mpx_fde);
+                       uint64_t add_flags = 0;
 
-                       if (handled_fde && handled_mpx) {
-                               epoll_update_event(epoll_ev, fde);
-                               continue;
-                       }
+                       add_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
+                       tevent_common_fd_mpx_additional_flags(fde,
+                                                             0,
+                                                             add_flags);
 
-                       if (!handled_mpx) {
-                               /*
-                                * If the mpx event was the one that needs
-                                * further handling, it's the TEVENT_FD_READ
-                                * event so switch over and call that handler.
-                                */
-                               fde = mpx_fde;
-                               mpx_fde = NULL;
+                       if (effective_flags & TEVENT_FD_READ) {
+                               flags |= TEVENT_FD_READ;
                        }
-                       flags |= TEVENT_FD_READ;
                }
-               if (events[i].events & EPOLLIN) flags |= TEVENT_FD_READ;
-               if (events[i].events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
-
-               if (flags & TEVENT_FD_WRITE) {
-                       if (fde->flags & TEVENT_FD_WRITE) {
-                               mpx_fde = NULL;
+               if (events[i].events & EPOLLIN) {
+                       if (effective_flags & TEVENT_FD_READ) {
+                               flags |= TEVENT_FD_READ;
                        }
-                       if (mpx_fde && mpx_fde->flags & TEVENT_FD_WRITE) {
-                               fde = mpx_fde;
-                               mpx_fde = NULL;
+               }
+               if (events[i].events & EPOLLOUT) {
+                       if (effective_flags & TEVENT_FD_WRITE) {
+                               flags |= TEVENT_FD_WRITE;
                        }
                }
 
-               if (mpx_fde) {
-                       /* Ensure we got the right fde. */
-                       if ((flags & fde->flags) == 0) {
-                               fde = mpx_fde;
-                               mpx_fde = NULL;
+               if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR)
+               {
+                       got_error = true;
+               }
+
+               selected = tevent_common_fd_mpx_select(fde, flags, got_error);
+               if (selected == NULL) {
+                       if (got_error) {
+                               /*
+                                * if we only wait for TEVENT_FD_WRITE, we
+                                * should not tell the event handler about it,
+                                * and remove the epoll_event, as we only
+                                * report errors when waiting for read events,
+                                * to match the select() behavior
+                                *
+                                * Do the same as the poll backend and
+                                * remove the writeable flag.
+                                */
+                               tevent_common_fd_mpx_clear_writeable(fde);
+                               epoll_update_event(epoll_ev, fde);
                        }
+                       continue;
                }
 
                /*
                 * make sure we only pass the flags
                 * the handler is expecting.
                 */
-               flags &= fde->flags;
-               if (flags) {
-                       return tevent_common_invoke_fd_handler(fde, flags, NULL);
-               }
+               flags &= selected->flags;
+               return tevent_common_invoke_fd_handler(selected,
+                                                      flags,
+                                                      NULL);
        }
 
        return 0;
@@ -753,11 +712,12 @@ static int epoll_event_context_init(struct tevent_context *ev)
 */
 static int epoll_event_fd_destructor(struct tevent_fd *fde)
 {
+       struct tevent_fd *old_primary = NULL;
+       struct tevent_fd *new_primary = NULL;
+       struct tevent_fd *update_primary = NULL;
        struct tevent_context *ev = fde->event_ctx;
        struct epoll_event_context *epoll_ev = NULL;
        bool panic_triggered = false;
-       struct tevent_fd *mpx_fde = NULL;
-       int flags = fde->flags;
 
        if (ev == NULL) {
                tevent_common_fd_mpx_reinit(fde);
@@ -774,19 +734,6 @@ static int epoll_event_fd_destructor(struct tevent_fd *fde)
         */
        DLIST_REMOVE(ev->fd_events, fde);
 
-       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX) {
-               mpx_fde = talloc_get_type_abort(fde->additional_data,
-                                               struct tevent_fd);
-
-               fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX;
-               mpx_fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_MPX;
-
-               fde->additional_data = NULL;
-               mpx_fde->additional_data = NULL;
-
-               fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
-       }
-
        epoll_ev->panic_state = &panic_triggered;
        if (epoll_ev->pid != tevent_cached_getpid()) {
                epoll_check_reopen(epoll_ev);
@@ -796,17 +743,28 @@ static int epoll_event_fd_destructor(struct tevent_fd *fde)
                }
        }
 
-       if (mpx_fde != NULL) {
-               epoll_update_event(epoll_ev, mpx_fde);
+       old_primary = tevent_common_fd_mpx_primary(fde);
+
+       if (old_primary == fde) {
+               epoll_del_event(epoll_ev, fde);
                if (panic_triggered) {
                        tevent_common_fd_mpx_reinit(fde);
                        return tevent_common_fd_destructor(fde);
                }
        }
 
-       fde->flags = 0;
-       epoll_update_event(epoll_ev, fde);
-       fde->flags = flags;
+       new_primary = tevent_common_fd_mpx_remove(fde);
+       if (new_primary == NULL) {
+               epoll_ev->panic_state = NULL;
+               return tevent_common_fd_destructor(fde);
+       }
+       update_primary = tevent_common_fd_mpx_update(new_primary);
+       if (update_primary == NULL) {
+               epoll_ev->panic_state = NULL;
+               return tevent_common_fd_destructor(fde);
+       }
+
+       epoll_update_event(epoll_ev, update_primary);
        if (panic_triggered) {
                return tevent_common_fd_destructor(fde);
        }
@@ -840,6 +798,12 @@ static struct tevent_fd *epoll_event_add_fd(struct tevent_context *ev, TALLOC_CT
 
        talloc_set_destructor(fde, epoll_event_fd_destructor);
 
+       /*
+        * prepare for tevent_common_fd_mpx_flags()
+        * in epoll_update_event()
+        */
+       tevent_common_fd_mpx_update_flags(fde);
+
        if (epoll_ev->pid != tevent_cached_getpid()) {
                epoll_ev->panic_state = &panic_triggered;
                epoll_check_reopen(epoll_ev);
@@ -874,6 +838,11 @@ static void epoll_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
        old_pid = epoll_ev->pid;
 
        fde->flags = flags;
+       /*
+        * prepare for tevent_common_fd_mpx_flags()
+        * in epoll_update_event()
+        */
+       tevent_common_fd_mpx_update_flags(fde);
 
        if (epoll_ev->pid != tevent_cached_getpid()) {
                epoll_ev->panic_state = &panic_triggered;