messaging3: Add messaging_filtered_read
authorVolker Lendecke <vl@samba.org>
Thu, 24 Apr 2014 09:05:53 +0000 (09:05 +0000)
committerVolker Lendecke <vl@samba.org>
Thu, 8 May 2014 07:10:12 +0000 (09:10 +0200)
This delegates the decision whether to read a message to a callback

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Stefan Metzmacher <metze@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/messages.h
source3/lib/messages.c

index 06c174833cd5ddb5933135027c61cf316f78c63e..7801dfb3d7053eb9f0ce2c2031d2bf8a1bec664b 100644 (file)
@@ -142,6 +142,14 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                            struct messaging_rec *rec);
 
+struct tevent_req *messaging_filtered_read_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct messaging_context *msg_ctx,
+       bool (*filter)(struct messaging_rec *rec, void *private_data),
+       void *private_data);
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                                struct messaging_rec **presult);
+
 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
                                       struct tevent_context *ev,
                                       struct messaging_context *msg,
index 9284ac132ab7132313c51f6f5e7d18328c9e8c4e..ca254a4cfea9362ef131302c94e975e2e4fda72d 100644 (file)
@@ -458,33 +458,38 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
        return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
        struct tevent_context *ev;
        struct messaging_context *msg_ctx;
-       uint32_t msg_type;
+
+       bool (*filter)(struct messaging_rec *rec, void *private_data);
+       void *private_data;
+
        struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-                                  enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+                                           enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-                                      struct tevent_context *ev,
-                                      struct messaging_context *msg_ctx,
-                                      uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct messaging_context *msg_ctx,
+       bool (*filter)(struct messaging_rec *rec, void *private_data),
+       void *private_data)
 {
        struct tevent_req *req;
-       struct messaging_read_state *state;
+       struct messaging_filtered_read_state *state;
        size_t new_waiters_len;
 
        req = tevent_req_create(mem_ctx, &state,
-                               struct messaging_read_state);
+                               struct messaging_filtered_read_state);
        if (req == NULL) {
                return NULL;
        }
        state->ev = ev;
        state->msg_ctx = msg_ctx;
-       state->msg_type = msg_type;
+       state->filter = filter;
+       state->private_data = private_data;
 
        new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
 
@@ -501,16 +506,16 @@ struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
 
        msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
        msg_ctx->num_new_waiters += 1;
-       tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+       tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
        return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-                                  enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+                                           enum tevent_req_state req_state)
 {
-       struct messaging_read_state *state = tevent_req_data(
-               req, struct messaging_read_state);
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
        struct messaging_context *msg_ctx = state->msg_ctx;
        unsigned i;
 
@@ -531,11 +536,11 @@ static void messaging_read_cleanup(struct tevent_req *req,
        }
 }
 
-static void messaging_read_done(struct tevent_req *req,
-                               struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+                                        struct messaging_rec *rec)
 {
-       struct messaging_read_state *state = tevent_req_data(
-               req, struct messaging_read_state);
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
 
        state->rec = messaging_rec_dup(state, rec);
        if (tevent_req_nomem(state->rec, req)) {
@@ -544,6 +549,79 @@ static void messaging_read_done(struct tevent_req *req,
        tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                                struct messaging_rec **presult)
+{
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               tevent_req_received(req);
+               return err;
+       }
+       *presult = talloc_move(mem_ctx, &state->rec);
+       return 0;
+}
+
+struct messaging_read_state {
+       uint32_t msg_type;
+       struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+                                 void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct messaging_context *msg,
+                                      uint32_t msg_type)
+{
+       struct tevent_req *req, *subreq;
+       struct messaging_read_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct messaging_read_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->msg_type = msg_type;
+
+       subreq = messaging_filtered_read_send(state, ev, msg,
+                                             messaging_read_filter, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, messaging_read_done, req);
+       return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+                                 void *private_data)
+{
+       struct messaging_read_state *state = talloc_get_type_abort(
+               private_data, struct messaging_read_state);
+
+       return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct messaging_read_state *state = tevent_req_data(
+               req, struct messaging_read_state);
+       int ret;
+
+       ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
                        struct messaging_rec **presult)
 {
@@ -552,7 +630,6 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
        int err;
 
        if (tevent_req_is_unix_error(req, &err)) {
-               tevent_req_received(req);
                return err;
        }
        if (presult != NULL) {
@@ -618,7 +695,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
        i = 0;
        while (i < msg_ctx->num_waiters) {
                struct tevent_req *req;
-               struct messaging_read_state *state;
+               struct messaging_filtered_read_state *state;
 
                req = msg_ctx->waiters[i];
                if (req == NULL) {
@@ -638,9 +715,10 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                        continue;
                }
 
-               state = tevent_req_data(req, struct messaging_read_state);
-               if (state->msg_type == rec->msg_type) {
-                       messaging_read_done(req, rec);
+               state = tevent_req_data(
+                       req, struct messaging_filtered_read_state);
+               if (state->filter(rec, state->private_data)) {
+                       messaging_filtered_read_done(req, rec);
                }
 
                i += 1;