ctdb-build: Fix -O3 developer build
[obnox/samba/samba-obnox.git] / ctdb / tests / src / pkt_write_test.c
1 /*
2    packet write tests
3
4    Copyright (C) Amitay Isaacs  2015
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "replace.h"
21 #include "system/filesys.h"
22
23 #include <assert.h>
24
25 #include "common/pkt_read.c"
26 #include "common/pkt_write.c"
27
28 struct writer_state {
29         struct tevent_context *ev;
30         int fd;
31         uint8_t *buf;
32         size_t buflen;
33         int  count;
34         struct tevent_req *subreq;
35 };
36
37 static void writer_next(struct tevent_req *subreq);
38
39 static struct tevent_req *writer_send(TALLOC_CTX *mem_ctx,
40                                       struct tevent_context *ev,
41                                       int fd, uint8_t *buf, size_t buflen)
42 {
43         struct tevent_req *req, *subreq;
44         struct writer_state *state;
45
46         req = tevent_req_create(mem_ctx, &state, struct writer_state);
47         if (req == NULL) {
48                 return NULL;
49         }
50
51         state->ev = ev;
52         state->fd = fd;
53         state->buf = buf;
54         state->buflen = buflen;
55         state->count = 0;
56
57         subreq = pkt_write_send(state, state->ev, state->fd,
58                                 state->buf, state->buflen);
59         if (tevent_req_nomem(subreq, req)) {
60                 return tevent_req_post(req, ev);
61         }
62
63         state->subreq = subreq;
64         tevent_req_set_callback(subreq, writer_next, req);
65         return req;
66 }
67
68 static void writer_next(struct tevent_req *subreq)
69 {
70         struct tevent_req *req = tevent_req_callback_data(
71                 subreq, struct tevent_req);
72         struct writer_state *state = tevent_req_data(
73                 req, struct writer_state);
74         ssize_t nwritten;
75         int err = 0;
76
77         nwritten = pkt_write_recv(subreq, &err);
78         TALLOC_FREE(subreq);
79         state->subreq = NULL;
80         if (nwritten == -1) {
81                 tevent_req_error(req, err);
82                 return;
83         }
84
85         if (nwritten != state->buflen) {
86                 tevent_req_error(req, EIO);
87                 return;
88         }
89
90         state->count++;
91         if (state->count >= 1000) {
92                 tevent_req_done(req);
93                 return;
94         }
95
96         subreq = pkt_write_send(state, state->ev, state->fd,
97                                 state->buf, state->buflen);
98         if (tevent_req_nomem(subreq, req)) {
99                 return;
100         }
101
102         state->subreq = subreq;
103         tevent_req_set_callback(subreq, writer_next, req);
104 }
105
106 static void writer_recv(struct tevent_req *req, int *perr)
107 {
108         struct writer_state *state = tevent_req_data(
109                 req, struct writer_state);
110         int err = 0;
111
112         if (state->subreq != NULL) {
113                 *perr = -1;
114                 return;
115         }
116
117         if (tevent_req_is_unix_error(req, &err)) {
118                 *perr = err;
119                 return;
120         }
121
122         *perr = 0;
123 }
124
125 static void writer_handler(struct tevent_context *ev, struct tevent_fd *fde,
126                            uint16_t flags, void *private_data)
127 {
128         struct tevent_req *req = talloc_get_type_abort(
129                 private_data, struct tevent_req);
130         struct writer_state *state = tevent_req_data(
131                 req, struct writer_state);
132
133         assert(state->subreq != NULL);
134         pkt_write_handler(ev, fde, flags, state->subreq);
135 }
136
137 static void writer(int fd)
138 {
139         TALLOC_CTX *mem_ctx;
140         struct tevent_context *ev;
141         struct tevent_fd *fde;
142         struct tevent_req *req;
143         uint8_t buf[1024*1024];
144         size_t buflen;
145         size_t pkt_size[4] = { 100, 500, 1024, 1024*1024 };
146         int i, err;
147
148         mem_ctx = talloc_new(NULL);
149         assert(mem_ctx != NULL);
150
151         ev = tevent_context_init(mem_ctx);
152         assert(ev != NULL);
153
154         for (i=0; i<1024*1024; i++) {
155                 buf[i] = i%256;
156         }
157
158         for (i=0; i<4; i++) {
159                 buflen = pkt_size[i];
160                 memcpy(buf, &buflen, sizeof(buflen));
161
162                 req = writer_send(mem_ctx, ev, fd, buf, buflen);
163                 assert(req != NULL);
164
165                 fde = tevent_add_fd(ev, mem_ctx, fd, TEVENT_FD_WRITE,
166                                     writer_handler, req);
167                 assert(fde != NULL);
168
169                 tevent_req_poll(req, ev);
170
171                 writer_recv(req, &err);
172                 assert(err == 0);
173
174                 talloc_free(fde);
175                 talloc_free(req);
176         }
177
178         close(fd);
179
180         talloc_free(mem_ctx);
181 }
182
183 struct reader_state {
184         struct tevent_context *ev;
185         int fd;
186         uint8_t buf[1024];
187         struct tevent_req *subreq;
188 };
189
190 static ssize_t reader_more(uint8_t *buf, size_t buflen, void *private_data);
191 static void reader_done(struct tevent_req *subreq);
192
193 static struct tevent_req *reader_send(TALLOC_CTX *mem_ctx,
194                                       struct tevent_context *ev,
195                                       int fd)
196 {
197         struct tevent_req *req, *subreq;
198         struct reader_state *state;
199
200         req = tevent_req_create(mem_ctx, &state, struct reader_state);
201         if (req == NULL) {
202                 return NULL;
203         }
204
205         state->ev = ev;
206         state->fd = fd;
207
208         subreq = pkt_read_send(state, state->ev, state->fd, 4,
209                                state->buf, 1024, reader_more, NULL);
210         if (tevent_req_nomem(subreq, req)) {
211                 tevent_req_post(req, ev);
212         }
213
214         state->subreq = subreq;
215         tevent_req_set_callback(subreq, reader_done, req);
216         return req;
217 }
218
219 static ssize_t reader_more(uint8_t *buf, size_t buflen, void *private_data)
220 {
221         uint32_t pkt_len;
222
223         if (buflen < sizeof(pkt_len)) {
224                 return sizeof(pkt_len) - buflen;
225         }
226
227         pkt_len = *(uint32_t *)buf;
228         return pkt_len - buflen;
229 }
230
231 static void reader_done(struct tevent_req *subreq)
232 {
233         struct tevent_req *req = tevent_req_callback_data(
234                 subreq, struct tevent_req);
235         struct reader_state *state = tevent_req_data(
236                 req, struct reader_state);
237         ssize_t nread;
238         uint8_t *buf;
239         bool free_buf;
240         int err = 0;
241
242         nread = pkt_read_recv(subreq, state, &buf, &free_buf, &err);
243         TALLOC_FREE(subreq);
244         state->subreq = NULL;
245         if (nread == -1) {
246                 if (err == EPIPE) {
247                         tevent_req_done(req);
248                 } else {
249                         tevent_req_error(req, err);
250                 }
251                 return;
252         }
253
254         if (free_buf) {
255                 talloc_free(buf);
256         }
257
258         subreq = pkt_read_send(state, state->ev, state->fd, 4,
259                                state->buf, 1024, reader_more, NULL);
260         if (tevent_req_nomem(subreq, req)) {
261                 return;
262         }
263
264         state->subreq = subreq;
265         tevent_req_set_callback(subreq, reader_done, req);
266 }
267
268 static void reader_recv(struct tevent_req *req, int *perr)
269 {
270         struct reader_state *state = tevent_req_data(
271                 req, struct reader_state);
272         int err = 0;
273
274         if (state->subreq != NULL) {
275                 *perr = -1;
276         }
277
278         if (tevent_req_is_unix_error(req, &err)) {
279                 *perr = err;
280                 return;
281         }
282
283         *perr = 0;
284 }
285
286 static void reader_handler(struct tevent_context *ev, struct tevent_fd *fde,
287                            uint16_t flags, void *private_data)
288 {
289         struct tevent_req *req = talloc_get_type_abort(
290                 private_data, struct tevent_req);
291         struct reader_state *state = tevent_req_data(
292                 req, struct reader_state);
293
294         assert(state->subreq != NULL);
295         pkt_read_handler(ev, fde, flags, state->subreq);
296 }
297
298 static void reader(int fd)
299 {
300         TALLOC_CTX *mem_ctx;
301         struct tevent_context *ev;
302         struct tevent_fd *fde;
303         struct tevent_req *req;
304         int err;
305
306         mem_ctx = talloc_new(NULL);
307         assert(mem_ctx != NULL);
308
309         ev = tevent_context_init(mem_ctx);
310         assert(ev != NULL);
311
312         req = reader_send(mem_ctx, ev, fd);
313         assert(req != NULL);
314
315         fde = tevent_add_fd(ev, mem_ctx, fd, TEVENT_FD_READ,
316                             reader_handler, req);
317         assert(fde != NULL);
318
319         tevent_req_poll(req, ev);
320
321         reader_recv(req, &err);
322         assert(err == 0);
323
324         close(fd);
325
326         talloc_free(mem_ctx);
327 }
328
329 static bool set_nonblocking(int fd)
330 {
331         int v;
332
333         v = fcntl(fd, F_GETFL, 0);
334         if (v == -1) {
335                 return false;
336         }
337         if (fcntl(fd, F_SETFL, v | O_NONBLOCK) == -1) {
338                 return false;
339         }
340         return true;
341 }
342
343 int main(void)
344 {
345         int fd[2];
346         int ret;
347         pid_t pid;
348
349         ret = pipe(fd);
350         assert(ret == 0);
351
352         pid = fork();
353         assert(pid != -1);
354
355         if (pid == 0) {
356                 /* Child process */
357                 close(fd[0]);
358                 writer(fd[1]);
359                 exit(0);
360         }
361
362         close(fd[1]);
363         if (!set_nonblocking(fd[0])) {
364                 exit(1);
365         }
366
367         reader(fd[0]);
368
369         return 0;
370 }