static void msg_channel_init_got_msg(struct messaging_context *msg,
void *priv, uint32_t msg_type,
struct server_id server_id, DATA_BLOB *data);
-static void msg_channel_trigger(struct tevent_context *ev,
- struct tevent_immediate *im,
- void *priv);
static int msg_channel_destructor(struct msg_channel *s);
struct tevent_req *msg_channel_init_send(TALLOC_CTX *mem_ctx,
return err;
}
+struct msg_read_state {
+ struct tevent_context *ev;
+ struct msg_channel *channel;
+ struct messaging_rec *rec;
+};
+
static void msg_channel_init_got_msg(struct messaging_context *msg,
void *priv, uint32_t msg_type,
struct server_id server_id,
struct messaging_rec *rec;
struct messaging_rec **msgs;
size_t num_msgs;
- struct tevent_immediate *im;
rec = talloc(s, struct messaging_rec);
if (rec == NULL) {
}
rec->buf.length = data->length;
+ if (s->pending_req != NULL) {
+ struct tevent_req *req = s->pending_req;
+ struct msg_read_state *state = tevent_req_data(
+ req, struct msg_read_state);
+
+ s->pending_req = NULL;
+
+ state->rec = talloc_move(state, &rec);
+ tevent_req_defer_callback(req, s->ev);
+ tevent_req_done(req);
+ return;
+ }
+
num_msgs = talloc_array_length(s->msgs);
msgs = talloc_realloc(s, s->msgs, struct messaging_rec *, num_msgs+1);
if (msgs == NULL) {
s->msgs = msgs;
s->msgs[num_msgs] = talloc_move(s->msgs, &rec);
- if (s->pending_req == NULL) {
- return;
- }
-
- im = tevent_create_immediate(s);
- if (im == NULL) {
- goto fail;
- }
- tevent_schedule_immediate(im, s->ev, msg_channel_trigger, s);
return;
fail:
TALLOC_FREE(rec);
}
-struct msg_read_state {
- struct tevent_context *ev;
- struct tevent_req *req;
- struct msg_channel *channel;
- struct messaging_rec *rec;
-};
-
-static int msg_read_state_destructor(struct msg_read_state *s);
static void msg_read_got_ctdb(struct tevent_req *subreq);
struct tevent_req *msg_read_send(TALLOC_CTX *mem_ctx,
struct msg_channel *channel)
{
struct tevent_req *req;
- struct tevent_immediate *im;
struct msg_read_state *state;
void *msg_tdb_event;
size_t num_msgs;
return NULL;
}
state->ev = ev;
- state->req = req;
state->channel = channel;
if (channel->pending_req != NULL) {
tevent_req_error(req, EBUSY);
return tevent_req_post(req, ev);
}
- channel->pending_req = req;
- channel->ev = ev;
- talloc_set_destructor(state, msg_read_state_destructor);
num_msgs = talloc_array_length(channel->msgs);
if (num_msgs != 0) {
- im = tevent_create_immediate(channel);
- if (tevent_req_nomem(im, req)) {
- return tevent_req_post(req, ev);
- }
- tevent_schedule_immediate(im, channel->ev, msg_channel_trigger,
- channel);
- return req;
+ state->rec = talloc_move(state, &channel->msgs[0]);
+ memmove(channel->msgs, channel->msgs+1,
+ sizeof(struct messaging_rec *) * (num_msgs-1));
+ channel->msgs = talloc_realloc(
+ channel, channel->msgs, struct messaging_rec *,
+ num_msgs - 1);
+ tevent_req_done(req);
+ return tevent_req_post(req, ev);
}
+ channel->pending_req = req;
+ channel->ev = ev;
+
msg_tdb_event = messaging_tdb_event(state, channel->msg, ev);
if (tevent_req_nomem(msg_tdb_event, req)) {
return tevent_req_post(req, ev);
return req;
}
-static int msg_read_state_destructor(struct msg_read_state *s)
-{
- assert(s->channel->pending_req == s->req);
- s->channel->pending_req = NULL;
- return 0;
-}
-
-static void msg_channel_trigger(struct tevent_context *ev,
- struct tevent_immediate *im,
- void *priv)
-{
- struct msg_channel *channel;
- struct tevent_req *req;
- struct msg_read_state *state;
- size_t num_msgs;
-
- channel = talloc_get_type_abort(priv, struct msg_channel);
- req = channel->pending_req;
- state = tevent_req_data(req, struct msg_read_state);
-
- talloc_set_destructor(state, NULL);
- msg_read_state_destructor(state);
-
- num_msgs = talloc_array_length(channel->msgs);
- assert(num_msgs > 0);
-
- state->rec = talloc_move(state, &channel->msgs[0]);
-
- memmove(channel->msgs, channel->msgs+1,
- sizeof(struct messaging_rec *) * (num_msgs-1));
- channel->msgs = talloc_realloc(
- channel, channel->msgs, struct messaging_rec *, num_msgs - 1);
-
- tevent_req_done(req);
-}
-
static void msg_read_got_ctdb(struct tevent_req *subreq)
{
struct tevent_req *req = tevent_req_callback_data(
return err;
}
*prec = talloc_move(mem_ctx, &state->rec);
+ tevent_req_received(req);
return 0;
}