messaging3: Add messaging_read_send/recv
authorVolker Lendecke <vl@samba.org>
Mon, 30 Dec 2013 10:26:52 +0000 (11:26 +0100)
committerVolker Lendecke <vl@samba.org>
Tue, 21 Jan 2014 07:10:41 +0000 (08:10 +0100)
This is made to replace the msg_channel abstraction.

msg_channel was created to not miss any messages. For this, some
complex queueing was installed. This complexity has caused quite a
few problems in the past (see bug 10284 for example).

messaging_read_send/recv is able to achieve the same goal with a
lot less complexity. The messaging_read_send atomically installs
the reader into the messaging_context, we will not miss any messages
while this installed. messaging_send_recv will deinstall that
listener, but in the callback function you can directly call
messaging_read_send again without going through the tevent_loop_once.
As long as this is always made sure, no messages will be lost.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/messages.h
source3/lib/dbwrap/dbwrap_watch.c
source3/lib/messages.c

index cefa2793fd5788b21e7500630f4d830167cb301b..27b315642c6a9ca8cb7584b0ca496d5d11f5b931 100644 (file)
@@ -80,6 +80,9 @@ struct messaging_context {
        struct tevent_context *event_ctx;
        struct messaging_callback *callbacks;
 
+       struct tevent_req **waiters;
+       unsigned num_waiters;
+
        struct messaging_backend *local;
        struct messaging_backend *remote;
 };
@@ -140,6 +143,13 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                            struct messaging_rec *rec);
 
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct messaging_context *msg,
+                                      uint32_t msg_type);
+int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                       struct messaging_rec **presult);
+
 #include "librpc/gen_ndr/ndr_messaging.h"
 
 #endif
index 7bdcd998f89c6326147c7a433784f75164eca0b6..e65dbf41bdea52eb1edf146e2f610a5064e1f11c 100644 (file)
@@ -22,7 +22,6 @@
 #include "dbwrap/dbwrap.h"
 #include "dbwrap_watch.h"
 #include "dbwrap_open.h"
-#include "msg_channel.h"
 #include "lib/util/util_tdb.h"
 #include "lib/util/tevent_ntstatus.h"
 
index ba473ae8ae284b35f7a9d0cbc71e0e50801230d3..58f45d3b1cf03617094d4692ae8867b80b24388d 100644 (file)
@@ -49,6 +49,7 @@
 #include "dbwrap/dbwrap.h"
 #include "serverid.h"
 #include "messages.h"
+#include "lib/util/tevent_unix.h"
 
 struct messaging_callback {
        struct messaging_callback *prev, *next;
@@ -425,6 +426,120 @@ NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
        return messaging_send(msg_ctx, server, msg_type, &blob);
 }
 
+static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
+                                              struct messaging_rec *rec)
+{
+       struct messaging_rec *result;
+
+       result = talloc_pooled_object(mem_ctx, struct messaging_rec,
+                                     1, rec->buf.length);
+       if (result == NULL) {
+               return NULL;
+       }
+       *result = *rec;
+
+       /* Doesn't fail, see talloc_pooled_object */
+
+       result->buf.data = talloc_memdup(result, rec->buf.data,
+                                        rec->buf.length);
+       return result;
+}
+
+struct messaging_read_state {
+       struct tevent_context *ev;
+       struct messaging_context *msg_ctx;
+       uint32_t msg_type;
+       struct messaging_rec *rec;
+};
+
+static void messaging_read_cleanup(struct tevent_req *req,
+                                  enum tevent_req_state req_state);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct messaging_context *msg_ctx,
+                                      uint32_t msg_type)
+{
+       struct tevent_req *req;
+       struct messaging_read_state *state;
+       size_t waiters_len;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct messaging_read_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->ev = ev;
+       state->msg_ctx = msg_ctx;
+       state->msg_type = msg_type;
+
+       waiters_len = talloc_array_length(msg_ctx->waiters);
+
+       if (waiters_len == msg_ctx->num_waiters) {
+               struct tevent_req **tmp;
+
+               tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
+                                    struct tevent_req *, waiters_len+1);
+               if (tevent_req_nomem(tmp, req)) {
+                       return tevent_req_post(req, ev);
+               }
+               msg_ctx->waiters = tmp;
+       }
+
+       msg_ctx->waiters[msg_ctx->num_waiters] = req;
+       msg_ctx->num_waiters += 1;
+       tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+
+       return req;
+}
+
+static void messaging_read_cleanup(struct tevent_req *req,
+                                  enum tevent_req_state req_state)
+{
+       struct messaging_read_state *state = tevent_req_data(
+               req, struct messaging_read_state);
+       struct messaging_context *msg_ctx = state->msg_ctx;
+       struct tevent_req **waiters = msg_ctx->waiters;
+       unsigned i;
+
+       tevent_req_set_cleanup_fn(req, NULL);
+
+       for (i=0; i<msg_ctx->num_waiters; i++) {
+               if (waiters[i] == req) {
+                       waiters[i] = waiters[msg_ctx->num_waiters-1];
+                       msg_ctx->num_waiters -= 1;
+                       return;
+               }
+       }
+}
+
+static void messaging_read_done(struct tevent_req *req, struct messaging_rec *rec)
+{
+       struct messaging_read_state *state = tevent_req_data(
+               req, struct messaging_read_state);
+
+       state->rec = messaging_rec_dup(state, rec);
+       if (tevent_req_nomem(state->rec, req)) {
+               return;
+       }
+       tevent_req_done(req);
+}
+
+int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                       struct messaging_rec **presult)
+{
+       struct messaging_read_state *state = tevent_req_data(
+               req, struct messaging_read_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               tevent_req_received(req);
+               return err;
+       }
+       *presult = talloc_move(mem_ctx, &state->rec);
+       return 0;
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -432,6 +547,7 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                            struct messaging_rec *rec)
 {
        struct messaging_callback *cb, *next;
+       unsigned i;
 
        for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
                next = cb->next;
@@ -445,6 +561,16 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                           the same message type */
                }
        }
+
+       for (i=0; i<msg_ctx->num_waiters; i++) {
+               struct tevent_req *req = msg_ctx->waiters[i];
+               struct messaging_read_state *state = tevent_req_data(
+                       req, struct messaging_read_state);
+
+               if (state->msg_type == rec->msg_type) {
+                       messaging_read_done(req, rec);
+               }
+       }
        return;
 }