{
struct tevent_req *req;
struct tdgram_sendto_queue_state *state;
- bool ok;
+ struct tevent_queue_entry *e;
req = tevent_req_create(mem_ctx, &state,
struct tdgram_sendto_queue_state);
state->caller.dst = dst;
state->ret = -1;
- ok = tevent_queue_add(queue,
- ev,
- req,
- tdgram_sendto_queue_trigger,
- NULL);
- if (!ok) {
- tevent_req_oom(req);
- goto post;
+ /*
+ * we use tevent_queue_add_optimize_empty() with allow_direct
+ * in order to optimize for the empty queue case.
+ */
+ e = tevent_queue_add_optimize_empty(
+ queue,
+ ev,
+ req,
+ tdgram_sendto_queue_trigger,
+ NULL);
+ if (tevent_req_nomem(e, req)) {
+ return tevent_req_post(req, ev);
+ }
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
}
return req;
-
- post:
- tevent_req_post(req, ev);
- return req;
}
static void tdgram_sendto_queue_trigger(struct tevent_req *req,
size_t to_read = 0;
size_t i;
struct tevent_req *subreq;
+ bool optimize = false;
+ bool save_optimize = false;
+
+ if (state->count > 0) {
+ /*
+ * This is not the first time we asked for a vector,
+ * which means parts of the pdu already arrived.
+ *
+ * In this case it make sense to enable
+ * a syscall/performance optimization if the
+ * low level tstream implementation supports it.
+ */
+ optimize = true;
+ }
TALLOC_FREE(state->vector);
state->count = 0;
return;
}
+ if (optimize) {
+ /*
+ * If the low level stream is a bsd socket
+ * we will get syscall optimization.
+ *
+ * If it is not a bsd socket
+ * tstream_bsd_optimize_readv() just returns.
+ */
+ save_optimize = tstream_bsd_optimize_readv(state->caller.stream,
+ true);
+ }
subreq = tstream_readv_send(state,
state->caller.ev,
state->caller.stream,
state->vector,
state->count);
+ if (optimize) {
+ tstream_bsd_optimize_readv(state->caller.stream,
+ save_optimize);
+ }
if (tevent_req_nomem(subreq, req)) {
return;
}
{
struct tevent_req *req;
struct tstream_readv_pdu_queue_state *state;
- bool ok;
+ struct tevent_queue_entry *e;
req = tevent_req_create(mem_ctx, &state,
struct tstream_readv_pdu_queue_state);
state->caller.next_vector_private = next_vector_private;
state->ret = -1;
- ok = tevent_queue_add(queue,
- ev,
- req,
- tstream_readv_pdu_queue_trigger,
- NULL);
- if (!ok) {
- tevent_req_oom(req);
- goto post;
+ /*
+ * we use tevent_queue_add_optimize_empty() with allow_direct
+ * in order to optimize for the empty queue case.
+ */
+ e = tevent_queue_add_optimize_empty(
+ queue,
+ ev,
+ req,
+ tstream_readv_pdu_queue_trigger,
+ NULL);
+ if (tevent_req_nomem(e, req)) {
+ return tevent_req_post(req, ev);
+ }
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
}
return req;
-
- post:
- return tevent_req_post(req, ev);
}
static void tstream_readv_pdu_queue_trigger(struct tevent_req *req,
{
struct tevent_req *req;
struct tstream_writev_queue_state *state;
- bool ok;
+ struct tevent_queue_entry *e;
req = tevent_req_create(mem_ctx, &state,
struct tstream_writev_queue_state);
state->caller.count = count;
state->ret = -1;
- ok = tevent_queue_add(queue,
- ev,
- req,
- tstream_writev_queue_trigger,
- NULL);
- if (!ok) {
- tevent_req_oom(req);
- goto post;
+ /*
+ * we use tevent_queue_add_optimize_empty() with allow_direct
+ * in order to optimize for the empty queue case.
+ */
+ e = tevent_queue_add_optimize_empty(
+ queue,
+ ev,
+ req,
+ tstream_writev_queue_trigger,
+ NULL);
+ if (tevent_req_nomem(e, req)) {
+ return tevent_req_post(req, ev);
+ }
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
}
return req;
-
- post:
- return tevent_req_post(req, ev);
}
static void tstream_writev_queue_trigger(struct tevent_req *req,
return ret;
}
+struct tstream_readline_context {
+ size_t chunk_size;
+ uint8_t *next_buffer;
+};
+
+struct tstream_readline_context *tstream_readline_create_context(TALLOC_CTX *mem_ctx,
+ size_t chunk_size)
+{
+ struct tstream_readline_context *ctx;
+
+ ctx = talloc(mem_ctx, struct tstream_readline_context);
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ ctx->chunk_size = chunk_size;
+ ctx->next_buffer = NULL;
+
+ return ctx;
+}
+
+struct tstream_readline_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tstream_context *stream;
+ struct tstream_readline_context *ctx;
+ size_t max_len;
+ } caller;
+ size_t checked_ofs;
+ uint8_t *current_buffer;
+ struct iovec next_iov;
+};
+
+static void tstream_readline_check(struct tevent_req *subreq);
+
+struct tevent_req *tstream_readline_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ struct tstream_readline_context *ctx,
+ size_t max_len)
+{
+ struct tevent_req *req;
+ struct tstream_readline_state *state;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_readline_state);
+ if (!req) {
+ return NULL;
+ }
+
+ state->caller.ev = ev;
+ state->caller.stream = stream;
+ state->caller.ctx = ctx;
+ state->caller.max_len = max_len;
+ state->checked_ofs = 0;
+ state->current_buffer = NULL;
+
+ if (ctx->next_buffer) {
+ state->current_buffer = talloc_move(state,
+ &ctx->next_buffer);
+ }
+
+ tstream_readline_check(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void tstream_readline_handler(struct tevent_req *subreq);
+
+static void tstream_readline_check(struct tevent_req *req)
+{
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ struct tstream_readline_context *ctx = state->caller.ctx;
+ size_t len = talloc_get_size(state->current_buffer);
+ bool end = false;
+
+ for (i=state->checked_ofs; i < len; i++) {
+ state->checked_ofs++;
+
+ if (state->current_buffer[i] == '\n') {
+ end = true;
+ break;
+ }
+ if (state->current_buffer[i] == '\0') {
+ end = true;
+ break;
+ }
+
+ if (state->checked_ofs > state->caller.max_len) {
+ tevent_req_error(req, EMSGSIZE);
+ return;
+ }
+ }
+
+ if (!end) {
+ goto more;
+ }
+
+ remaining = len - state->checked_ofs;
+
+ if (remaining > 0 ) {
+ ctx->next_buffer = talloc_array(ctx, uint8_t,
+ remaining);
+ if (tevent_req_nomem(ctx->next_buffer, req)) {
+ return;
+ }
+
+ memcpy(ctx->next_buffer,
+ state->current_buffer + state->checked_ofs,
+ remaining);
+
+ /* truncating can't fail */
+ talloc_realloc(state, state->current_buffer,
+ uint8_t, state->checked_ofs);
+ }
+
+ tevent_req_done(req);
+ return;
+
+more:
+
+ pending = tstream_pending_bytes(stream);
+ if (pending == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+
+ if ((pending > 0) && (pending < ctx->chunk_size)) {
+ nread = pending;
+ } else {
+ nread = ctx->chunk_size;
+ }
+
+ state->current_buffer = talloc_realloc(state,
+ state->current_buffer,
+ uint8_t,
+ state->checked_ofs + nread);
+ if (tevent_req_nomem(state->current_buffer, req)) {
+ return;
+ }
+
+ state->next_iov.iov_base = state->current_buffer + state->checked_ofs;
+ state->next_iov.iov_len = nread;
+
+ subreq = tstream_readv_send(state,
+ state->caller.ev,
+ state->caller.stream,
+ &state->next_iov, 1);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, tstream_readline_handler, req);
+}
+
+static void tstream_readline_handler(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ struct tstream_readline_context *ctx = state->caller.ctx;
+ int ret;
+ int sys_errno;
+
+ ret = tstream_readv_recv(subreq, &sys_errno);
+ TALLOC_FREE(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
+ }
+
+ tstream_readline_check(req);
+}
+
+ssize_t tstream_readline_recv(struct tevent_req *req,
+ int *perrno,
+ TALLOC_CTX *mem_ctx,
+ const char **line)
+{
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ ssize_t ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ uint8_t *l;
+ l = talloc_move(mem_ctx, &state->current_buffer);
+ *line = (char *)l;
+ ret = state->checked_ofs;
+ }
+
+ tevent_req_received(req);
+ return ret;
+}
+