lib: Enhance poll_funcs_tevent for multiple tevent_contexts
authorVolker Lendecke <vl@samba.org>
Mon, 5 May 2014 06:45:52 +0000 (08:45 +0200)
committerVolker Lendecke <vl@samba.org>
Thu, 8 May 2014 07:10:12 +0000 (09:10 +0200)
With this patch it will be possible to use nested event contexts with
messaging_filtered_read_send/recv. Before this patchset only the one and only
event context a messaging_context is initialized with is able to receive
datagrams from the unix domain socket. So if you want to code a synchronous
RPC-like operation using a nested event context, you will not see the reply,
because the nested event context does not have the required tevent_fd's.
Unfortunately, this patchset has to add some advanced array voodoo. The idea
is that state->watches[] contains what we hand out with watch_new, and
state->contexts contains references to the tevent_contexts. For every watch we
need a tevent_fd in every event context, and the routines make sure that the
arrays are properly maintained.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/lib/messages_dgm.c
source3/lib/poll_funcs/poll_funcs_tevent.c
source3/lib/poll_funcs/poll_funcs_tevent.h
source3/lib/unix_msg/test_drain.c
source3/lib/unix_msg/test_source.c
source3/lib/unix_msg/tests.c

index 354dac36773b305815d3636a53bc566f976471cd..56643b1ffd606a868b262470ca431bbba079b4e3 100644 (file)
@@ -30,7 +30,8 @@
 
 struct messaging_dgm_context {
        struct messaging_context *msg_ctx;
-       struct poll_funcs msg_callbacks;
+       struct poll_funcs *msg_callbacks;
+       void *tevent_handle;
        struct unix_msg_ctx *dgm_ctx;
        char *cache_dir;
        int lockfile_fd;
@@ -224,7 +225,18 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
                return map_nt_error_from_unix(ret);
        }
 
-       poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+       ctx->msg_callbacks = poll_funcs_init_tevent(ctx);
+       if (ctx->msg_callbacks == NULL) {
+               TALLOC_FREE(result);
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       ctx->tevent_handle = poll_funcs_tevent_register(
+               ctx, ctx->msg_callbacks, msg_ctx->event_ctx);
+       if (ctx->tevent_handle == NULL) {
+               TALLOC_FREE(result);
+               return NT_STATUS_NO_MEMORY;
+       }
 
        ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
                                              0700);
@@ -239,7 +251,7 @@ NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
 
        generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
 
-       ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+       ret = unix_msg_init(socket_name, ctx->msg_callbacks, 1024, cookie,
                            messaging_dgm_recv, ctx, &ctx->dgm_ctx);
        TALLOC_FREE(socket_name);
        if (ret != 0) {
index 6e750429badc171e60e3f34c49f67fbfbad946b7..ee800badb41d1fb25b85d3e662bbc28959619d6e 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
 #include "tevent.h"
 #include "system/select.h"
 
+/*
+ * A poll_watch is asked for by the engine using this library via
+ * funcs->watch_new(). It represents interest in "fd" becoming readable or
+ * writable.
+ */
+
 struct poll_watch {
-       struct tevent_fd *fde;
+       struct poll_funcs_state *state;
+       unsigned slot;          /* index into state->watches[] */
        int fd;
+       int events;
        void (*callback)(struct poll_watch *w, int fd, short events,
                         void *private_data);
        void *private_data;
 };
 
+struct poll_funcs_state {
+       /*
+        * "watches" is the array of all watches that we have handed out via
+        * funcs->watch_new(). The "watches" array can contain NULL pointers.
+        */
+       unsigned num_watches;
+       struct poll_watch **watches;
+
+       /*
+        * "contexts is the array of tevent_contexts that serve
+        * "watches". "contexts" can contain NULL pointers.
+        */
+       unsigned num_contexts;
+       struct poll_funcs_tevent_context **contexts;
+};
+
+struct poll_funcs_tevent_context {
+       unsigned refcount;
+       struct poll_funcs_state *state;
+       unsigned slot;          /* index into state->contexts[] */
+       struct tevent_context *ev;
+       struct tevent_fd **fdes; /* same indexes as state->watches[] */
+};
+
+/*
+ * poll_funcs_tevent_register() hands out a struct poll_funcs_tevent_handle as
+ * a void *. poll_funcs_tevent_register allows tevent_contexts to be
+ * registered multiple times, and we can't add a tevent_fd for the same fd's
+ * multiple times. So we have to share one poll_funcs_tevent_context.
+ */
+struct poll_funcs_tevent_handle {
+       struct poll_funcs_tevent_context *ctx;
+};
+
 static uint16_t poll_events_to_tevent(short events)
 {
        uint16_t ret = 0;
@@ -54,9 +96,56 @@ static short tevent_to_poll_events(uint16_t flags)
        return ret;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-                                struct tevent_fd *fde, uint16_t flags,
-                                void *private_data);
+/*
+ * Find or create a free slot in state->watches[]
+ */
+static bool poll_funcs_watch_find_slot(struct poll_funcs_state *state,
+                                      unsigned *slot)
+{
+       struct poll_watch **watches;
+       unsigned i;
+
+       for (i=0; i<state->num_watches; i++) {
+               if (state->watches[i] == NULL) {
+                       *slot = i;
+                       return true;
+               }
+       }
+
+       watches = talloc_realloc(state, state->watches, struct poll_watch *,
+                                state->num_watches + 1);
+       if (watches == NULL) {
+               return false;
+       }
+       watches[state->num_watches] = NULL;
+       state->watches = watches;
+
+       for (i=0; i<state->num_contexts; i++) {
+               struct tevent_fd **fdes;
+               struct poll_funcs_tevent_context *c = state->contexts[i];
+               if (c == NULL) {
+                       continue;
+               }
+               fdes = talloc_realloc(c, c->fdes, struct tevent_fd *,
+                                     state->num_watches + 1);
+               if (fdes == NULL) {
+                       return false;
+               }
+               c->fdes = fdes;
+
+               fdes[state->num_watches] = NULL;
+       }
+
+       *slot = state->num_watches;
+       state->num_watches += 1;
+
+       return true;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+                                  struct tevent_fd *fde, uint16_t flags,
+                                  void *private_data);
+static int poll_watch_destructor(struct poll_watch *w);
 
 static struct poll_watch *tevent_watch_new(
        const struct poll_funcs *funcs, int fd, short events,
@@ -64,45 +153,87 @@ static struct poll_watch *tevent_watch_new(
                         void *private_data),
        void *private_data)
 {
-       struct tevent_context *ev = talloc_get_type_abort(
-               funcs->private_data, struct tevent_context);
+       struct poll_funcs_state *state = talloc_get_type_abort(
+               funcs->private_data, struct poll_funcs_state);
        struct poll_watch *w;
+       unsigned i, slot;
 
-       w = talloc(ev, struct poll_watch);
-       if (w == NULL) {
+       if (!poll_funcs_watch_find_slot(state, &slot)) {
                return NULL;
        }
-       w->fde = tevent_add_fd(ev, w, fd, poll_events_to_tevent(events),
-                              tevent_watch_handler, w);
-       if (w->fde == NULL) {
-               TALLOC_FREE(w);
+
+       w = talloc(state->watches, struct poll_watch);
+       if (w == NULL) {
                return NULL;
        }
+       w->state = state;
+       w->slot = slot;
+       w->fd = fd;
+       w->events = poll_events_to_tevent(events);
        w->fd = fd;
        w->callback = callback;
        w->private_data = private_data;
+       state->watches[slot] = w;
+
+       talloc_set_destructor(w, poll_watch_destructor);
+
+       for (i=0; i<state->num_contexts; i++) {
+               struct poll_funcs_tevent_context *c = state->contexts[i];
+               if (c == NULL) {
+                       continue;
+               }
+               c->fdes[slot] = tevent_add_fd(c->ev, c->fdes, w->fd, w->events,
+                                             poll_funcs_fde_handler, w);
+               if (c->fdes[slot] == NULL) {
+                       goto fail;
+               }
+       }
        return w;
+
+fail:
+       TALLOC_FREE(w);
+       return NULL;
 }
 
-static void tevent_watch_handler(struct tevent_context *ev,
-                                struct tevent_fd *fde, uint16_t flags,
-                                void *private_data)
+static int poll_watch_destructor(struct poll_watch *w)
 {
-       struct poll_watch *w = talloc_get_type_abort(
-               private_data, struct poll_watch);
+       struct poll_funcs_state *state = w->state;
+       unsigned slot = w->slot;
+       unsigned i;
+
+       TALLOC_FREE(state->watches[slot]);
+
+       for (i=0; i<state->num_contexts; i++) {
+               struct poll_funcs_tevent_context *c = state->contexts[i];
+               if (c == NULL) {
+                       continue;
+               }
+               TALLOC_FREE(c->fdes[slot]);
+       }
 
-       w->callback(w, w->fd, tevent_to_poll_events(flags),
-                   w->private_data);
+       return 0;
 }
 
 static void tevent_watch_update(struct poll_watch *w, short events)
 {
-       tevent_fd_set_flags(w->fde, poll_events_to_tevent(events));
+       struct poll_funcs_state *state = w->state;
+       unsigned slot = w->slot;
+       unsigned i;
+
+       w->events = poll_events_to_tevent(events);
+
+       for (i=0; i<state->num_contexts; i++) {
+               struct poll_funcs_tevent_context *c = state->contexts[i];
+               if (c == NULL) {
+                       continue;
+               }
+               tevent_fd_set_flags(c->fdes[slot], w->events);
+       }
 }
 
 static short tevent_watch_get_events(struct poll_watch *w)
 {
-       return tevent_to_poll_events(tevent_fd_get_flags(w->fde));
+       return tevent_to_poll_events(w->events);
 }
 
 static void tevent_watch_free(struct poll_watch *w)
@@ -130,8 +261,24 @@ static void tevent_timeout_free(struct poll_timeout *t)
        return;
 }
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
+static int poll_funcs_state_destructor(struct poll_funcs_state *state);
+
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx)
 {
+       struct poll_funcs *f;
+       struct poll_funcs_state *state;
+
+       f = talloc(mem_ctx, struct poll_funcs);
+       if (f == NULL) {
+               return NULL;
+       }
+       state = talloc_zero(f, struct poll_funcs_state);
+       if (state == NULL) {
+               TALLOC_FREE(f);
+               return NULL;
+       }
+       talloc_set_destructor(state, poll_funcs_state_destructor);
+
        f->watch_new = tevent_watch_new;
        f->watch_update = tevent_watch_update;
        f->watch_get_events = tevent_watch_get_events;
@@ -139,5 +286,166 @@ void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev)
        f->timeout_new = tevent_timeout_new;
        f->timeout_update = tevent_timeout_update;
        f->timeout_free = tevent_timeout_free;
-       f->private_data = ev;
+       f->private_data = state;
+       return f;
+}
+
+static int poll_funcs_state_destructor(struct poll_funcs_state *state)
+{
+       unsigned i;
+       /*
+        * Make sure the watches are cleared before the contexts. The watches
+        * have destructors attached to them that clean up the fde's
+        */
+       for (i=0; i<state->num_watches; i++) {
+               TALLOC_FREE(state->watches[i]);
+       }
+       return 0;
+}
+
+/*
+ * Find or create a free slot in state->contexts[]
+ */
+static bool poll_funcs_context_slot_find(struct poll_funcs_state *state,
+                                        struct tevent_context *ev,
+                                        unsigned *slot)
+{
+       struct poll_funcs_tevent_context **contexts;
+       unsigned i;
+
+       for (i=0; i<state->num_contexts; i++) {
+               struct poll_funcs_tevent_context *ctx = state->contexts[i];
+
+               if ((ctx == NULL) || (ctx->ev == ev)) {
+                       *slot = i;
+                       return true;
+               }
+       }
+
+       contexts = talloc_realloc(state, state->contexts,
+                                 struct poll_funcs_tevent_context *,
+                                 state->num_contexts + 1);
+       if (contexts == NULL) {
+               return false;
+       }
+       state->contexts = contexts;
+       state->contexts[state->num_contexts] = NULL;
+
+       *slot = state->num_contexts;
+       state->num_contexts += 1;
+
+       return true;
+}
+
+static int poll_funcs_tevent_context_destructor(
+       struct poll_funcs_tevent_context *ctx);
+
+static struct poll_funcs_tevent_context *poll_funcs_tevent_context_new(
+       TALLOC_CTX *mem_ctx, struct poll_funcs_state *state,
+       struct tevent_context *ev, unsigned slot)
+{
+       struct poll_funcs_tevent_context *ctx;
+       unsigned i;
+
+       ctx = talloc(mem_ctx, struct poll_funcs_tevent_context);
+       if (ctx == NULL) {
+               return NULL;
+       }
+
+       ctx->refcount = 0;
+       ctx->state = state;
+       ctx->ev = ev;
+       ctx->slot = slot;
+
+       ctx->fdes = talloc_array(ctx, struct tevent_fd *, state->num_watches);
+       if (ctx->fdes == NULL) {
+               goto fail;
+       }
+
+       for (i=0; i<state->num_watches; i++) {
+               struct poll_watch *w = state->watches[i];
+
+               if (w == NULL) {
+                       ctx->fdes[i] = NULL;
+                       continue;
+               }
+               ctx->fdes[i] = tevent_add_fd(ev, ctx->fdes, w->fd, w->events,
+                                            poll_funcs_fde_handler, w);
+               if (ctx->fdes[i] == NULL) {
+                       goto fail;
+               }
+       }
+       talloc_set_destructor(ctx, poll_funcs_tevent_context_destructor);
+       return ctx;
+fail:
+       TALLOC_FREE(ctx);
+       return NULL;
+}
+
+static int poll_funcs_tevent_context_destructor(
+       struct poll_funcs_tevent_context *ctx)
+{
+       ctx->state->contexts[ctx->slot] = NULL;
+       return 0;
+}
+
+static void poll_funcs_fde_handler(struct tevent_context *ev,
+                                  struct tevent_fd *fde, uint16_t flags,
+                                  void *private_data)
+{
+       struct poll_watch *w = talloc_get_type_abort(
+               private_data, struct poll_watch);
+       short events = tevent_to_poll_events(flags);
+       w->callback(w, w->fd, events, w->private_data);
+}
+
+static int poll_funcs_tevent_handle_destructor(
+       struct poll_funcs_tevent_handle *handle);
+
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+                                struct tevent_context *ev)
+{
+       struct poll_funcs_state *state = talloc_get_type_abort(
+               f->private_data, struct poll_funcs_state);
+       struct poll_funcs_tevent_handle *handle;
+       unsigned slot;
+
+       handle = talloc(mem_ctx, struct poll_funcs_tevent_handle);
+       if (handle == NULL) {
+               return NULL;
+       }
+
+       if (!poll_funcs_context_slot_find(state, ev, &slot)) {
+               goto fail;
+       }
+       if (state->contexts[slot] == NULL) {
+               state->contexts[slot] = poll_funcs_tevent_context_new(
+                       state->contexts, state, ev, slot);
+               if (state->contexts[slot] == NULL) {
+                       goto fail;
+               }
+       }
+
+       handle->ctx = state->contexts[slot];
+       handle->ctx->refcount += 1;
+       talloc_set_destructor(handle, poll_funcs_tevent_handle_destructor);
+       return handle;
+fail:
+       TALLOC_FREE(handle);
+       return NULL;
+}
+
+static int poll_funcs_tevent_handle_destructor(
+       struct poll_funcs_tevent_handle *handle)
+{
+       if (handle->ctx->refcount == 0) {
+               abort();
+       }
+       handle->ctx->refcount -= 1;
+
+       if (handle->ctx->refcount != 0) {
+               return 0;
+       }
+       TALLOC_FREE(handle->ctx);
+       return 0;
 }
index 2e677203203ffb24e53ca4e15756d2eea0061e16..8b2964c230d081ded5b93f3a2d12c594df5b117f 100644 (file)
@@ -1,6 +1,6 @@
 /*
  * Unix SMB/CIFS implementation.
- * Copyright (C) Volker Lendecke 2013
+ * Copyright (C) Volker Lendecke 2013,2014
  *
  * This program is free software; you can redistribute it and/or modify
  * it under the terms of the GNU General Public License as published by
 #include "poll_funcs.h"
 #include "tevent.h"
 
-void poll_funcs_init_tevent(struct poll_funcs *f, struct tevent_context *ev);
+/*
+ * Create a new, empty instance of "struct poll_funcs" to be served by tevent.
+ */
+struct poll_funcs *poll_funcs_init_tevent(TALLOC_CTX *mem_ctx);
+
+/*
+ * Register a tevent_context to handle the watches that the user of
+ * "poll_funcs" showed interest in. talloc_free() the returned pointer when
+ * "ev" is not supposed to handle the events anymore.
+ */
+void *poll_funcs_tevent_register(TALLOC_CTX *mem_ctx, struct poll_funcs *f,
+                                struct tevent_context *ev);
 
 #endif
index 6fe8c188367912de5a6396091c3d41fe6b80c304..c2568b6646b2632e829dce3205c35e12c2a625d6 100644 (file)
@@ -16,7 +16,7 @@ static void recv_cb(struct unix_msg_ctx *ctx,
 
 int main(int argc, const char *argv[])
 {
-       struct poll_funcs funcs;
+       struct poll_funcs *funcs;
        const char *sock;
        struct unix_msg_ctx *ctx;
        struct tevent_context *ev;
@@ -37,10 +37,13 @@ int main(int argc, const char *argv[])
                perror("tevent_context_init failed");
                return 1;
        }
-       poll_funcs_init_tevent(&funcs, ev);
+       funcs = poll_funcs_init_tevent(ev);
+       if (funcs == NULL) {
+               fprintf(stderr, "poll_funcs_init_tevent failed\n");
+               return 1;
+       }
 
-       ret = unix_msg_init(sock, &funcs, 256, 1,
-                           recv_cb, &state, &ctx);
+       ret = unix_msg_init(sock, funcs, 256, 1, recv_cb, &state, &ctx);
        if (ret != 0) {
                fprintf(stderr, "unix_msg_init failed: %s\n",
                        strerror(ret));
index bfafee1fd33b193a9b204aa5f2b3da27af164299..94984d88523ba31bc9679fac85eb6d248ac76282 100644 (file)
@@ -5,7 +5,8 @@
 
 int main(int argc, const char *argv[])
 {
-       struct poll_funcs funcs;
+       struct poll_funcs *funcs;
+       void *tevent_handle;
        struct unix_msg_ctx **ctxs;
        struct tevent_context *ev;
        struct iovec iov;
@@ -26,7 +27,16 @@ int main(int argc, const char *argv[])
                perror("tevent_context_init failed");
                return 1;
        }
-       poll_funcs_init_tevent(&funcs, ev);
+       funcs = poll_funcs_init_tevent(NULL);
+       if (funcs == NULL) {
+               fprintf(stderr, "poll_funcs_init_tevent failed\n");
+               return 1;
+       }
+       tevent_handle = poll_funcs_tevent_register(NULL, funcs, ev);
+       if (tevent_handle == NULL) {
+               fprintf(stderr, "poll_funcs_tevent_register failed\n");
+               return 1;
+       }
 
        ctxs = talloc_array(ev, struct unix_msg_ctx *, num_ctxs);
        if (ctxs == NULL) {
@@ -35,7 +45,7 @@ int main(int argc, const char *argv[])
        }
 
        for (i=0; i<num_ctxs; i++) {
-               ret = unix_msg_init(NULL, &funcs, 256, 1, NULL, NULL,
+               ret = unix_msg_init(NULL, funcs, 256, 1, NULL, NULL,
                                    &ctxs[i]);
                if (ret != 0) {
                        fprintf(stderr, "unix_msg_init failed: %s\n",
index 2a4cf862347729b269801942c215f2dca339b9e3..29d5dcb374596a572245cb984c31b08b9a2ece44 100644 (file)
@@ -32,7 +32,8 @@ static void expect_messages(struct tevent_context *ev, struct cb_state *state,
 
 int main(void)
 {
-       struct poll_funcs funcs;
+       struct poll_funcs *funcs;
+       void *tevent_handle;
        const char *sock1 = "sock1";
        const char *sock2 = "sock2";
        struct unix_msg_ctx *ctx1, *ctx2;
@@ -52,9 +53,19 @@ int main(void)
                perror("tevent_context_init failed");
                return 1;
        }
-       poll_funcs_init_tevent(&funcs, ev);
 
-       ret = unix_msg_init(sock1, &funcs, 256, 1,
+       funcs = poll_funcs_init_tevent(ev);
+       if (funcs == NULL) {
+               fprintf(stderr, "poll_funcs_init_tevent failed\n");
+               return 1;
+       }
+       tevent_handle = poll_funcs_tevent_register(ev, funcs, ev);
+       if (tevent_handle == NULL) {
+               fprintf(stderr, "poll_funcs_register_tevent failed\n");
+               return 1;
+       }
+
+       ret = unix_msg_init(sock1, funcs, 256, 1,
                            recv_cb, &state, &ctx1);
        if (ret != 0) {
                fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -62,7 +73,7 @@ int main(void)
                return 1;
        }
 
-       ret = unix_msg_init(sock1, &funcs, 256, 1,
+       ret = unix_msg_init(sock1, funcs, 256, 1,
                            recv_cb, &state, &ctx1);
        if (ret == 0) {
                fprintf(stderr, "unix_msg_init succeeded unexpectedly\n");
@@ -74,7 +85,7 @@ int main(void)
                return 1;
        }
 
-       ret = unix_msg_init(sock2, &funcs, 256, 1,
+       ret = unix_msg_init(sock2, funcs, 256, 1,
                            recv_cb, &state, &ctx2);
        if (ret != 0) {
                fprintf(stderr, "unix_msg_init failed: %s\n",
@@ -201,6 +212,8 @@ int main(void)
 
        unix_msg_free(ctx1);
        unix_msg_free(ctx2);
+       talloc_free(tevent_handle);
+       talloc_free(funcs);
        talloc_free(ev);
 
        return 0;