return ret;
}
+struct tstream_readline_context {
+ size_t chunk_size;
+ uint8_t *next_buffer;
+};
+
+struct tstream_readline_context *tstream_readline_create_context(TALLOC_CTX *mem_ctx,
+ size_t chunk_size)
+{
+ struct tstream_readline_context *ctx;
+
+ ctx = talloc(mem_ctx, struct tstream_readline_context);
+ if (ctx == NULL) {
+ return NULL;
+ }
+
+ ctx->chunk_size = chunk_size;
+ ctx->next_buffer = NULL;
+
+ return ctx;
+}
+
+struct tstream_readline_state {
+ /* this structs are owned by the caller */
+ struct {
+ struct tevent_context *ev;
+ struct tstream_context *stream;
+ struct tstream_readline_context *ctx;
+ size_t max_len;
+ } caller;
+ size_t checked_ofs;
+ uint8_t *current_buffer;
+ struct iovec next_iov;
+};
+
+static void tstream_readline_check(struct tevent_req *subreq);
+
+struct tevent_req *tstream_readline_send(TALLOC_CTX *mem_ctx,
+ struct tevent_context *ev,
+ struct tstream_context *stream,
+ struct tstream_readline_context *ctx,
+ size_t max_len)
+{
+ struct tevent_req *req;
+ struct tstream_readline_state *state;
+ bool ok;
+
+ req = tevent_req_create(mem_ctx, &state,
+ struct tstream_readline_state);
+ if (!req) {
+ return NULL;
+ }
+
+ state->caller.ev = ev;
+ state->caller.stream = stream;
+ state->caller.ctx = ctx;
+ state->caller.max_len = max_len;
+ state->checked_ofs = 0;
+ state->current_buffer = NULL;
+
+ if (ctx->next_buffer) {
+ state->current_buffer = talloc_move(state,
+ &ctx->next_buffer);
+ }
+
+ tstream_readline_check(req);
+ if (!tevent_req_is_in_progress(req)) {
+ return tevent_req_post(req, ev);
+ }
+
+ return req;
+}
+
+static void tstream_readline_handler(struct tevent_req *subreq);
+
+static void tstream_readline_check(struct tevent_req *req)
+{
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ struct tstream_readline_context *ctx = state->caller.ctx;
+ size_t len = talloc_get_size(state->current_buffer);
+ bool end = false;
+
+ for (i=state->checked_ofs; i < len; i++) {
+ state->checked_ofs++;
+
+ if (state->current_buffer[i] == '\n') {
+ end = true;
+ break;
+ }
+ if (state->current_buffer[i] == '\0') {
+ end = true;
+ break;
+ }
+
+ if (state->checked_ofs > state->caller.max_len) {
+ tevent_req_error(req, EMSGSIZE);
+ return;
+ }
+ }
+
+ if (!end) {
+ goto more;
+ }
+
+ remaining = len - state->checked_ofs;
+
+ if (remaining > 0 ) {
+ ctx->next_buffer = talloc_array(ctx, uint8_t,
+ remaining);
+ if (tevent_req_nomem(ctx->next_buffer, req)) {
+ return;
+ }
+
+ memcpy(ctx->next_buffer,
+ state->current_buffer + state->checked_ofs,
+ remaining);
+
+ /* truncating can't fail */
+ talloc_realloc(state, state->current_buffer,
+ uint8_t, state->checked_ofs);
+ }
+
+ tevent_req_done(req);
+ return;
+
+more:
+
+ pending = tstream_pending_bytes(stream);
+ if (pending == -1) {
+ tevent_req_error(req, errno);
+ return;
+ }
+
+ if ((pending > 0) && (pending < ctx->chunk_size)) {
+ nread = pending;
+ } else {
+ nread = ctx->chunk_size;
+ }
+
+ state->current_buffer = talloc_realloc(state,
+ state->current_buffer,
+ uint8_t,
+ state->checked_ofs + nread);
+ if (tevent_req_nomem(state->current_buffer, req)) {
+ return;
+ }
+
+ state->next_iov.iov_base = state->current_buffer + state->checked_ofs;
+ state->next_iov.iov_len = nread;
+
+ subreq = tstream_readv_send(state,
+ state->caller.ev,
+ state->caller.stream,
+ &state->next_iov, 1);
+ if (tevent_req_nomem(subreq, req)) {
+ return;
+ }
+ tevent_req_set_callback(subreq, tstream_readline_handler, req);
+}
+
+static void tstream_readline_handler(struct tevent_req *subreq)
+{
+ struct tevent_req *req = tevent_req_callback_data(subreq,
+ struct tevent_req);
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ struct tstream_readline_context *ctx = state->caller.ctx;
+ int ret;
+ int sys_errno;
+
+ ret = tstream_readv_recv(subreq, &sys_errno);
+ TALLOC_FREE(subreq);
+ if (ret == -1) {
+ tevent_req_error(req, sys_errno);
+ return;
+ }
+
+ tstream_readline_check(req);
+}
+
+ssize_t tstream_readline_recv(struct tevent_req *req,
+ int *perrno,
+ TALLOC_CTX *mem_ctx,
+ const char **line)
+{
+ struct tstream_readline_state *state = tevent_req_data(req,
+ struct tstream_readline_state);
+ ssize_t ret;
+
+ ret = tsocket_simple_int_recv(req, perrno);
+ if (ret == 0) {
+ uint8_t *l;
+ l = talloc_move(mem_ctx, &state->current_buffer);
+ *line = (char *)l;
+ ret = state->checked_ofs;
+ }
+
+ tevent_req_received(req);
+ return ret;
+}
+