4 Copyright (C) Amitay Isaacs 2015
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
25 #include "common/pkt_read.c"
26 #include "common/pkt_write.c"
29 struct tevent_context *ev;
34 struct tevent_req *subreq;
37 static void writer_next(struct tevent_req *subreq);
39 static struct tevent_req *writer_send(TALLOC_CTX *mem_ctx,
40 struct tevent_context *ev,
41 int fd, uint8_t *buf, size_t buflen)
43 struct tevent_req *req, *subreq;
44 struct writer_state *state;
46 req = tevent_req_create(mem_ctx, &state, struct writer_state);
54 state->buflen = buflen;
57 subreq = pkt_write_send(state, state->ev, state->fd,
58 state->buf, state->buflen);
59 if (tevent_req_nomem(subreq, req)) {
60 return tevent_req_post(req, ev);
63 state->subreq = subreq;
64 tevent_req_set_callback(subreq, writer_next, req);
68 static void writer_next(struct tevent_req *subreq)
70 struct tevent_req *req = tevent_req_callback_data(
71 subreq, struct tevent_req);
72 struct writer_state *state = tevent_req_data(
73 req, struct writer_state);
77 nwritten = pkt_write_recv(subreq, &err);
81 tevent_req_error(req, err);
85 if (nwritten != state->buflen) {
86 tevent_req_error(req, EIO);
91 if (state->count >= 1000) {
96 subreq = pkt_write_send(state, state->ev, state->fd,
97 state->buf, state->buflen);
98 if (tevent_req_nomem(subreq, req)) {
102 state->subreq = subreq;
103 tevent_req_set_callback(subreq, writer_next, req);
106 static void writer_recv(struct tevent_req *req, int *perr)
108 struct writer_state *state = tevent_req_data(
109 req, struct writer_state);
112 if (state->subreq != NULL) {
117 if (tevent_req_is_unix_error(req, &err)) {
125 static void writer_handler(struct tevent_context *ev, struct tevent_fd *fde,
126 uint16_t flags, void *private_data)
128 struct tevent_req *req = talloc_get_type_abort(
129 private_data, struct tevent_req);
130 struct writer_state *state = tevent_req_data(
131 req, struct writer_state);
133 assert(state->subreq != NULL);
134 pkt_write_handler(ev, fde, flags, state->subreq);
137 static void writer(int fd)
140 struct tevent_context *ev;
141 struct tevent_fd *fde;
142 struct tevent_req *req;
143 uint8_t buf[1024*1024];
145 size_t pkt_size[4] = { 100, 500, 1024, 1024*1024 };
148 mem_ctx = talloc_new(NULL);
149 assert(mem_ctx != NULL);
151 ev = tevent_context_init(mem_ctx);
154 for (i=0; i<1024*1024; i++) {
158 for (i=0; i<4; i++) {
159 buflen = pkt_size[i];
160 *(uint32_t *)buf = buflen;
162 req = writer_send(mem_ctx, ev, fd, buf, buflen);
165 fde = tevent_add_fd(ev, mem_ctx, fd, TEVENT_FD_WRITE,
166 writer_handler, req);
169 tevent_req_poll(req, ev);
171 writer_recv(req, &err);
180 talloc_free(mem_ctx);
183 struct reader_state {
184 struct tevent_context *ev;
187 struct tevent_req *subreq;
190 static ssize_t reader_more(uint8_t *buf, size_t buflen, void *private_data);
191 static void reader_done(struct tevent_req *subreq);
193 static struct tevent_req *reader_send(TALLOC_CTX *mem_ctx,
194 struct tevent_context *ev,
197 struct tevent_req *req, *subreq;
198 struct reader_state *state;
200 req = tevent_req_create(mem_ctx, &state, struct reader_state);
208 subreq = pkt_read_send(state, state->ev, state->fd, 4,
209 state->buf, 1024, reader_more, NULL);
210 if (tevent_req_nomem(subreq, req)) {
211 tevent_req_post(req, ev);
214 state->subreq = subreq;
215 tevent_req_set_callback(subreq, reader_done, req);
219 static ssize_t reader_more(uint8_t *buf, size_t buflen, void *private_data)
223 if (buflen < sizeof(pkt_len)) {
224 return sizeof(pkt_len) - buflen;
227 pkt_len = *(uint32_t *)buf;
228 return pkt_len - buflen;
231 static void reader_done(struct tevent_req *subreq)
233 struct tevent_req *req = tevent_req_callback_data(
234 subreq, struct tevent_req);
235 struct reader_state *state = tevent_req_data(
236 req, struct reader_state);
242 nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
244 state->subreq = NULL;
247 tevent_req_done(req);
249 tevent_req_error(req, err);
258 subreq = pkt_read_send(state, state->ev, state->fd, 4,
259 state->buf, 1024, reader_more, NULL);
260 if (tevent_req_nomem(subreq, req)) {
264 state->subreq = subreq;
265 tevent_req_set_callback(subreq, reader_done, req);
268 static void reader_recv(struct tevent_req *req, int *perr)
270 struct reader_state *state = tevent_req_data(
271 req, struct reader_state);
274 if (state->subreq != NULL) {
278 if (tevent_req_is_unix_error(req, &err)) {
286 static void reader_handler(struct tevent_context *ev, struct tevent_fd *fde,
287 uint16_t flags, void *private_data)
289 struct tevent_req *req = talloc_get_type_abort(
290 private_data, struct tevent_req);
291 struct reader_state *state = tevent_req_data(
292 req, struct reader_state);
294 assert(state->subreq != NULL);
295 pkt_read_handler(ev, fde, flags, state->subreq);
298 static void reader(int fd)
301 struct tevent_context *ev;
302 struct tevent_fd *fde;
303 struct tevent_req *req;
306 mem_ctx = talloc_new(NULL);
307 assert(mem_ctx != NULL);
309 ev = tevent_context_init(mem_ctx);
312 req = reader_send(mem_ctx, ev, fd);
315 fde = tevent_add_fd(ev, mem_ctx, fd, TEVENT_FD_READ,
316 reader_handler, req);
319 tevent_req_poll(req, ev);
321 reader_recv(req, &err);
326 talloc_free(mem_ctx);
329 static bool set_nonblocking(int fd)
333 v = fcntl(fd, F_GETFL, 0);
337 if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
363 if (!set_nonblocking(fd[0])) {