messaging3: Add comments about not touching "waiters"
[mat/samba.git] / source3 / lib / messages.c
index 9354ac8330a8c19c4c873c80c4bc19d0007cbda8..6a08531f2a86356e9ae63dc2451def8795c931de 100644 (file)
@@ -458,79 +458,105 @@ static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
        return result;
 }
 
-struct messaging_read_state {
+struct messaging_filtered_read_state {
        struct tevent_context *ev;
        struct messaging_context *msg_ctx;
-       uint32_t msg_type;
+
+       bool (*filter)(struct messaging_rec *rec, void *private_data);
+       void *private_data;
+
        struct messaging_rec *rec;
 };
 
-static void messaging_read_cleanup(struct tevent_req *req,
-                                  enum tevent_req_state req_state);
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+                                           enum tevent_req_state req_state);
 
-struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
-                                      struct tevent_context *ev,
-                                      struct messaging_context *msg_ctx,
-                                      uint32_t msg_type)
+struct tevent_req *messaging_filtered_read_send(
+       TALLOC_CTX *mem_ctx, struct tevent_context *ev,
+       struct messaging_context *msg_ctx,
+       bool (*filter)(struct messaging_rec *rec, void *private_data),
+       void *private_data)
 {
        struct tevent_req *req;
-       struct messaging_read_state *state;
-       size_t waiters_len;
+       struct messaging_filtered_read_state *state;
+       size_t new_waiters_len;
 
        req = tevent_req_create(mem_ctx, &state,
-                               struct messaging_read_state);
+                               struct messaging_filtered_read_state);
        if (req == NULL) {
                return NULL;
        }
        state->ev = ev;
        state->msg_ctx = msg_ctx;
-       state->msg_type = msg_type;
+       state->filter = filter;
+       state->private_data = private_data;
 
-       waiters_len = talloc_array_length(msg_ctx->waiters);
+       /*
+        * We add ourselves to the "new_waiters" array, not the "waiters"
+        * array. If we are called from within messaging_read_done,
+        * messaging_dispatch_rec will be in an active for-loop on
+        * "waiters". We must be careful not to mess with this array, because
+        * it could mean that a single event is being delivered twice.
+        */
 
-       if (waiters_len == msg_ctx->num_waiters) {
+       new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
+
+       if (new_waiters_len == msg_ctx->num_new_waiters) {
                struct tevent_req **tmp;
 
-               tmp = talloc_realloc(msg_ctx, msg_ctx->waiters,
-                                    struct tevent_req *, waiters_len+1);
+               tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
+                                    struct tevent_req *, new_waiters_len+1);
                if (tevent_req_nomem(tmp, req)) {
                        return tevent_req_post(req, ev);
                }
-               msg_ctx->waiters = tmp;
+               msg_ctx->new_waiters = tmp;
        }
 
-       msg_ctx->waiters[msg_ctx->num_waiters] = req;
-       msg_ctx->num_waiters += 1;
-       tevent_req_set_cleanup_fn(req, messaging_read_cleanup);
+       msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
+       msg_ctx->num_new_waiters += 1;
+       tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
 
        return req;
 }
 
-static void messaging_read_cleanup(struct tevent_req *req,
-                                  enum tevent_req_state req_state)
+static void messaging_filtered_read_cleanup(struct tevent_req *req,
+                                           enum tevent_req_state req_state)
 {
-       struct messaging_read_state *state = tevent_req_data(
-               req, struct messaging_read_state);
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
        struct messaging_context *msg_ctx = state->msg_ctx;
-       struct tevent_req **waiters = msg_ctx->waiters;
        unsigned i;
 
        tevent_req_set_cleanup_fn(req, NULL);
 
+       /*
+        * Just set the [new_]waiters entry to NULL, be careful not to mess
+        * with the other "waiters" array contents. We are often called from
+        * within "messaging_dispatch_rec", which loops over
+        * "waiters". Messing with the "waiters" array will mess up that
+        * for-loop.
+        */
+
        for (i=0; i<msg_ctx->num_waiters; i++) {
-               if (waiters[i] == req) {
-                       waiters[i] = waiters[msg_ctx->num_waiters-1];
-                       msg_ctx->num_waiters -= 1;
+               if (msg_ctx->waiters[i] == req) {
+                       msg_ctx->waiters[i] = NULL;
+                       return;
+               }
+       }
+
+       for (i=0; i<msg_ctx->num_new_waiters; i++) {
+               if (msg_ctx->new_waiters[i] == req) {
+                       msg_ctx->new_waiters[i] = NULL;
                        return;
                }
        }
 }
 
-static void messaging_read_done(struct tevent_req *req,
-                               struct messaging_rec *rec)
+static void messaging_filtered_read_done(struct tevent_req *req,
+                                        struct messaging_rec *rec)
 {
-       struct messaging_read_state *state = tevent_req_data(
-               req, struct messaging_read_state);
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
 
        state->rec = messaging_rec_dup(state, rec);
        if (tevent_req_nomem(state->rec, req)) {
@@ -539,6 +565,79 @@ static void messaging_read_done(struct tevent_req *req,
        tevent_req_done(req);
 }
 
+int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
+                                struct messaging_rec **presult)
+{
+       struct messaging_filtered_read_state *state = tevent_req_data(
+               req, struct messaging_filtered_read_state);
+       int err;
+
+       if (tevent_req_is_unix_error(req, &err)) {
+               tevent_req_received(req);
+               return err;
+       }
+       *presult = talloc_move(mem_ctx, &state->rec);
+       return 0;
+}
+
+struct messaging_read_state {
+       uint32_t msg_type;
+       struct messaging_rec *rec;
+};
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+                                 void *private_data);
+static void messaging_read_done(struct tevent_req *subreq);
+
+struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
+                                      struct tevent_context *ev,
+                                      struct messaging_context *msg,
+                                      uint32_t msg_type)
+{
+       struct tevent_req *req, *subreq;
+       struct messaging_read_state *state;
+
+       req = tevent_req_create(mem_ctx, &state,
+                               struct messaging_read_state);
+       if (req == NULL) {
+               return NULL;
+       }
+       state->msg_type = msg_type;
+
+       subreq = messaging_filtered_read_send(state, ev, msg,
+                                             messaging_read_filter, state);
+       if (tevent_req_nomem(subreq, req)) {
+               return tevent_req_post(req, ev);
+       }
+       tevent_req_set_callback(subreq, messaging_read_done, req);
+       return req;
+}
+
+static bool messaging_read_filter(struct messaging_rec *rec,
+                                 void *private_data)
+{
+       struct messaging_read_state *state = talloc_get_type_abort(
+               private_data, struct messaging_read_state);
+
+       return rec->msg_type == state->msg_type;
+}
+
+static void messaging_read_done(struct tevent_req *subreq)
+{
+       struct tevent_req *req = tevent_req_callback_data(
+               subreq, struct tevent_req);
+       struct messaging_read_state *state = tevent_req_data(
+               req, struct messaging_read_state);
+       int ret;
+
+       ret = messaging_filtered_read_recv(subreq, state, &state->rec);
+       TALLOC_FREE(subreq);
+       if (tevent_req_error(req, ret)) {
+               return;
+       }
+       tevent_req_done(req);
+}
+
 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
                        struct messaging_rec **presult)
 {
@@ -547,13 +646,42 @@ int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
        int err;
 
        if (tevent_req_is_unix_error(req, &err)) {
-               tevent_req_received(req);
                return err;
        }
-       *presult = talloc_move(mem_ctx, &state->rec);
+       if (presult != NULL) {
+               *presult = talloc_move(mem_ctx, &state->rec);
+       }
        return 0;
 }
 
+static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
+{
+       if (msg_ctx->num_new_waiters == 0) {
+               return true;
+       }
+
+       if (talloc_array_length(msg_ctx->waiters) <
+           (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
+               struct tevent_req **tmp;
+               tmp = talloc_realloc(
+                       msg_ctx, msg_ctx->waiters, struct tevent_req *,
+                       msg_ctx->num_waiters + msg_ctx->num_new_waiters);
+               if (tmp == NULL) {
+                       DEBUG(1, ("%s: talloc failed\n", __func__));
+                       return false;
+               }
+               msg_ctx->waiters = tmp;
+       }
+
+       memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
+              sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
+
+       msg_ctx->num_waiters += msg_ctx->num_new_waiters;
+       msg_ctx->num_new_waiters = 0;
+
+       return true;
+}
+
 /*
   Dispatch one messaging_rec
 */
@@ -576,14 +704,40 @@ void messaging_dispatch_rec(struct messaging_context *msg_ctx,
                }
        }
 
-       for (i=0; i<msg_ctx->num_waiters; i++) {
-               struct tevent_req *req = msg_ctx->waiters[i];
-               struct messaging_read_state *state = tevent_req_data(
-                       req, struct messaging_read_state);
+       if (!messaging_append_new_waiters(msg_ctx)) {
+               return;
+       }
+
+       i = 0;
+       while (i < msg_ctx->num_waiters) {
+               struct tevent_req *req;
+               struct messaging_filtered_read_state *state;
+
+               req = msg_ctx->waiters[i];
+               if (req == NULL) {
+                       /*
+                        * This got cleaned up. In the meantime,
+                        * move everything down one. We need
+                        * to keep the order of waiters, as
+                        * other code may depend on this.
+                        */
+                       if (i <  msg_ctx->num_waiters - 1) {
+                               memmove(&msg_ctx->waiters[i],
+                                       &msg_ctx->waiters[i+1],
+                                       sizeof(struct tevent_req *) *
+                                           (msg_ctx->num_waiters - i - 1));
+                       }
+                       msg_ctx->num_waiters -= 1;
+                       continue;
+               }
 
-               if (state->msg_type == rec->msg_type) {
-                       messaging_read_done(req, rec);
+               state = tevent_req_data(
+                       req, struct messaging_filtered_read_state);
+               if (state->filter(rec, state->private_data)) {
+                       messaging_filtered_read_done(req, rec);
                }
+
+               i += 1;
        }
        return;
 }
@@ -597,7 +751,8 @@ bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
 
        req = background_job_send(
                msg, msg->event_ctx, msg, NULL, 0,
-               lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+               lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+                           60*15),
                mess_parent_dgm_cleanup, msg);
        if (req == NULL) {
                return false;
@@ -614,7 +769,8 @@ static int mess_parent_dgm_cleanup(void *private_data)
 
        status = messaging_dgm_wipe(msg_ctx);
        DEBUG(10, ("messaging_dgm_wipe returned %s\n", nt_errstr(status)));
-       return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15);
+       return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+                          60*15);
 }
 
 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
@@ -625,11 +781,13 @@ static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
 
        status = background_job_recv(req);
        TALLOC_FREE(req);
-       DEBUG(1, ("messaging dgm cleanup job ended with %s\n", nt_errstr(status)));
+       DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
+                 nt_errstr(status)));
 
        req = background_job_send(
                msg, msg->event_ctx, msg, NULL, 0,
-               lp_parm_int(-1, "messaging", "messaging dgm cleanup interval", 60*15),
+               lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
+                           60*15),
                mess_parent_dgm_cleanup, msg);
        if (req == NULL) {
                DEBUG(1, ("background_job_send failed\n"));