v3-4-ctdb: Add recfrom_send/recv
[obnox/samba-ctdb.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program.  If not, see <http://www.gnu.org/licenses/>.
18 */
19
20 #include "includes.h"
21 #include "lib/talloc/talloc.h"
22 #include "lib/tevent/tevent.h"
23 #include "lib/async_req/async_req.h"
24 #include "lib/async_req/async_sock.h"
25 #include "lib/util/tevent_unix.h"
26 #include <fcntl.h>
27
28 #ifndef TALLOC_FREE
29 #define TALLOC_FREE(ctx) do { talloc_free(ctx); ctx=NULL; } while(0)
30 #endif
31
32 /**
33  * @brief Map async_req states to unix-style errnos
34  * @param[in]  req      The async req to get the state from
35  * @param[out] err      Pointer to take the unix-style errno
36  *
37  * @return true if the async_req is in an error state, false otherwise
38  */
39
40 bool async_req_is_errno(struct async_req *req, int *err)
41 {
42         enum async_req_state state;
43         uint64_t error;
44
45         if (!async_req_is_error(req, &state, &error)) {
46                 return false;
47         }
48
49         switch (state) {
50         case ASYNC_REQ_USER_ERROR:
51                 *err = (int)error;
52                 break;
53         case ASYNC_REQ_TIMED_OUT:
54 #ifdef ETIMEDOUT
55                 *err = ETIMEDOUT;
56 #else
57                 *err = EAGAIN;
58 #endif
59                 break;
60         case ASYNC_REQ_NO_MEMORY:
61                 *err = ENOMEM;
62                 break;
63         default:
64                 *err = EIO;
65                 break;
66         }
67         return true;
68 }
69
70 int async_req_simple_recv_errno(struct async_req *req)
71 {
72         int err;
73
74         if (async_req_is_errno(req, &err)) {
75                 return err;
76         }
77
78         return 0;
79 }
80
81 struct async_send_state {
82         int fd;
83         const void *buf;
84         size_t len;
85         int flags;
86         ssize_t sent;
87 };
88
89 static void async_send_handler(struct tevent_context *ev,
90                                struct tevent_fd *fde,
91                                uint16_t flags, void *private_data);
92
93 struct tevent_req *async_send_send(TALLOC_CTX *mem_ctx,
94                                    struct tevent_context *ev,
95                                    int fd, const void *buf, size_t len,
96                                    int flags)
97 {
98         struct tevent_req *result;
99         struct async_send_state *state;
100         struct tevent_fd *fde;
101
102         result = tevent_req_create(mem_ctx, &state, struct async_send_state);
103         if (result == NULL) {
104                 return result;
105         }
106         state->fd = fd;
107         state->buf = buf;
108         state->len = len;
109         state->flags = flags;
110
111         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_WRITE, async_send_handler,
112                             result);
113         if (fde == NULL) {
114                 TALLOC_FREE(result);
115                 return NULL;
116         }
117         return result;
118 }
119
120 static void async_send_handler(struct tevent_context *ev,
121                                struct tevent_fd *fde,
122                                uint16_t flags, void *private_data)
123 {
124         struct tevent_req *req = talloc_get_type_abort(
125                 private_data, struct tevent_req);
126         struct async_send_state *state =
127                 tevent_req_data(req, struct async_send_state);
128
129         state->sent = send(state->fd, state->buf, state->len, state->flags);
130         if (state->sent == -1) {
131                 tevent_req_error(req, errno);
132                 return;
133         }
134         tevent_req_done(req);
135 }
136
137 ssize_t async_send_recv(struct tevent_req *req, int *perrno)
138 {
139         struct async_send_state *state =
140                 tevent_req_data(req, struct async_send_state);
141
142         if (tevent_req_is_unix_error(req, perrno)) {
143                 return -1;
144         }
145         return state->sent;
146 }
147
148 struct async_recv_state {
149         int fd;
150         void *buf;
151         size_t len;
152         int flags;
153         ssize_t received;
154 };
155
156 static void async_recv_handler(struct tevent_context *ev,
157                                struct tevent_fd *fde,
158                                uint16_t flags, void *private_data);
159
160 struct tevent_req *async_recv_send(TALLOC_CTX *mem_ctx,
161                                    struct tevent_context *ev,
162                                    int fd, void *buf, size_t len, int flags)
163 {
164         struct tevent_req *result;
165         struct async_recv_state *state;
166         struct tevent_fd *fde;
167
168         result = tevent_req_create(mem_ctx, &state, struct async_recv_state);
169         if (result == NULL) {
170                 return result;
171         }
172         state->fd = fd;
173         state->buf = buf;
174         state->len = len;
175         state->flags = flags;
176
177         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, async_recv_handler,
178                             result);
179         if (fde == NULL) {
180                 TALLOC_FREE(result);
181                 return NULL;
182         }
183         return result;
184 }
185
186 static void async_recv_handler(struct tevent_context *ev,
187                                struct tevent_fd *fde,
188                                uint16_t flags, void *private_data)
189 {
190         struct tevent_req *req = talloc_get_type_abort(
191                 private_data, struct tevent_req);
192         struct async_recv_state *state =
193                 tevent_req_data(req, struct async_recv_state);
194
195         state->received = recv(state->fd, state->buf, state->len,
196                                state->flags);
197         if (state->received == -1) {
198                 tevent_req_error(req, errno);
199                 return;
200         }
201         tevent_req_done(req);
202 }
203
204 ssize_t async_recv_recv(struct tevent_req *req, int *perrno)
205 {
206         struct async_recv_state *state =
207                 tevent_req_data(req, struct async_recv_state);
208
209         if (tevent_req_is_unix_error(req, perrno)) {
210                 return -1;
211         }
212         return state->received;
213 }
214
215 struct async_connect_state {
216         int fd;
217         int result;
218         int sys_errno;
219         long old_sockflags;
220 };
221
222 static void async_connect_connected(struct tevent_context *ev,
223                                     struct tevent_fd *fde, uint16_t flags,
224                                     void *priv);
225
226 /**
227  * @brief async version of connect(2)
228  * @param[in] mem_ctx   The memory context to hang the result off
229  * @param[in] ev        The event context to work from
230  * @param[in] fd        The socket to recv from
231  * @param[in] address   Where to connect?
232  * @param[in] address_len Length of *address
233  * @retval The async request
234  *
235  * This function sets the socket into non-blocking state to be able to call
236  * connect in an async state. This will be reset when the request is finished.
237  */
238
239 struct tevent_req *async_connect_send(TALLOC_CTX *mem_ctx,
240                                       struct tevent_context *ev,
241                                       int fd, const struct sockaddr *address,
242                                       socklen_t address_len)
243 {
244         struct tevent_req *result;
245         struct async_connect_state *state;
246         struct tevent_fd *fde;
247
248         result = tevent_req_create(
249                 mem_ctx, &state, struct async_connect_state);
250         if (result == NULL) {
251                 return NULL;
252         }
253
254         /**
255          * We have to set the socket to nonblocking for async connect(2). Keep
256          * the old sockflags around.
257          */
258
259         state->fd = fd;
260         state->sys_errno = 0;
261
262         state->old_sockflags = fcntl(fd, F_GETFL, 0);
263         if (state->old_sockflags == -1) {
264                 goto post_errno;
265         }
266
267         set_blocking(fd, false);
268
269         state->result = connect(fd, address, address_len);
270         if (state->result == 0) {
271                 tevent_req_done(result);
272                 goto done;
273         }
274
275         /**
276          * A number of error messages show that something good is progressing
277          * and that we have to wait for readability.
278          *
279          * If none of them are present, bail out.
280          */
281
282         if (!(errno == EINPROGRESS || errno == EALREADY ||
283 #ifdef EISCONN
284               errno == EISCONN ||
285 #endif
286               errno == EAGAIN || errno == EINTR)) {
287                 state->sys_errno = errno;
288                 goto post_errno;
289         }
290
291         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ | TEVENT_FD_WRITE,
292                            async_connect_connected, result);
293         if (fde == NULL) {
294                 state->sys_errno = ENOMEM;
295                 goto post_errno;
296         }
297         return result;
298
299  post_errno:
300         tevent_req_error(result, state->sys_errno);
301  done:
302         fcntl(fd, F_SETFL, state->old_sockflags);
303         return tevent_req_post(result, ev);
304 }
305
306 /**
307  * fde event handler for connect(2)
308  * @param[in] ev        The event context that sent us here
309  * @param[in] fde       The file descriptor event associated with the connect
310  * @param[in] flags     Indicate read/writeability of the socket
311  * @param[in] priv      private data, "struct async_req *" in this case
312  */
313
314 static void async_connect_connected(struct tevent_context *ev,
315                                     struct tevent_fd *fde, uint16_t flags,
316                                     void *priv)
317 {
318         struct tevent_req *req = talloc_get_type_abort(
319                 priv, struct tevent_req);
320         struct async_connect_state *state =
321                 tevent_req_data(req, struct async_connect_state);
322
323         TALLOC_FREE(fde);
324
325         /*
326          * Stevens, Network Programming says that if there's a
327          * successful connect, the socket is only writable. Upon an
328          * error, it's both readable and writable.
329          */
330         if ((flags & (TEVENT_FD_READ|TEVENT_FD_WRITE))
331             == (TEVENT_FD_READ|TEVENT_FD_WRITE)) {
332                 int sockerr;
333                 socklen_t err_len = sizeof(sockerr);
334
335                 if (getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
336                                (void *)&sockerr, &err_len) == 0) {
337                         errno = sockerr;
338                 }
339
340                 state->sys_errno = errno;
341
342                 DEBUG(10, ("connect returned %s\n", strerror(errno)));
343
344                 fcntl(state->fd, F_SETFL, state->old_sockflags);
345                 tevent_req_error(req, state->sys_errno);
346                 return;
347         }
348
349         state->sys_errno = 0;
350         tevent_req_done(req);
351 }
352
353 int async_connect_recv(struct tevent_req *req, int *perrno)
354 {
355         struct async_connect_state *state =
356                 tevent_req_data(req, struct async_connect_state);
357         int err;
358
359         fcntl(state->fd, F_SETFL, state->old_sockflags);
360
361         if (tevent_req_is_unix_error(req, &err)) {
362                 *perrno = err;
363                 return -1;
364         }
365
366         if (state->sys_errno == 0) {
367                 return 0;
368         }
369
370         *perrno = state->sys_errno;
371         return -1;
372 }
373
374 struct writev_state {
375         struct tevent_context *ev;
376         int fd;
377         struct iovec *iov;
378         int count;
379         size_t total_size;
380 };
381
382 static void writev_trigger(struct tevent_req *req, void *private_data);
383 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
384                            uint16_t flags, void *private_data);
385
386 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
387                                struct tevent_queue *queue, int fd,
388                                struct iovec *iov, int count)
389 {
390         struct tevent_req *result;
391         struct writev_state *state;
392
393         result = tevent_req_create(mem_ctx, &state, struct writev_state);
394         if (result == NULL) {
395                 return NULL;
396         }
397         state->ev = ev;
398         state->fd = fd;
399         state->total_size = 0;
400         state->count = count;
401         state->iov = (struct iovec *)talloc_memdup(
402                 state, iov, sizeof(struct iovec) * count);
403         if (state->iov == NULL) {
404                 goto fail;
405         }
406
407         if (queue == NULL) {
408                 writev_trigger(result, NULL);
409                 if (!tevent_req_is_in_progress(result)) {
410                         return tevent_req_post(result, ev);
411                 }
412                 return result;
413         }
414         if (!tevent_queue_add(queue, ev, result, writev_trigger, NULL)) {
415                 goto fail;
416         }
417         return result;
418  fail:
419         TALLOC_FREE(result);
420         return NULL;
421 }
422
423 static void writev_trigger(struct tevent_req *req, void *private_data)
424 {
425         struct writev_state *state = tevent_req_data(req, struct writev_state);
426         struct tevent_fd *fde;
427
428         fde = tevent_add_fd(state->ev, state, state->fd, TEVENT_FD_WRITE,
429                             writev_handler, req);
430         if (fde == NULL) {
431                 tevent_req_error(req, ENOMEM);
432         }
433 }
434
435 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
436                            uint16_t flags, void *private_data)
437 {
438         struct tevent_req *req = talloc_get_type_abort(
439                 private_data, struct tevent_req);
440         struct writev_state *state =
441                 tevent_req_data(req, struct writev_state);
442         size_t to_write, written;
443         int i;
444
445         to_write = 0;
446
447         for (i=0; i<state->count; i++) {
448                 to_write += state->iov[i].iov_len;
449         }
450
451         written = sys_writev(state->fd, state->iov, state->count);
452         if (written == -1) {
453                 tevent_req_error(req, errno);
454                 return;
455         }
456         if (written == 0) {
457                 tevent_req_error(req, EPIPE);
458                 return;
459         }
460         state->total_size += written;
461
462         if (written == to_write) {
463                 tevent_req_done(req);
464                 return;
465         }
466
467         /*
468          * We've written less than we were asked to, drop stuff from
469          * state->iov.
470          */
471
472         while (written > 0) {
473                 if (written < state->iov[0].iov_len) {
474                         state->iov[0].iov_base =
475                                 (char *)state->iov[0].iov_base + written;
476                         state->iov[0].iov_len -= written;
477                         break;
478                 }
479                 written -= state->iov[0].iov_len;
480                 state->iov += 1;
481                 state->count -= 1;
482         }
483 }
484
485 ssize_t writev_recv(struct tevent_req *req, int *perrno)
486 {
487         struct writev_state *state =
488                 tevent_req_data(req, struct writev_state);
489
490         if (tevent_req_is_unix_error(req, perrno)) {
491                 return -1;
492         }
493         return state->total_size;
494 }
495
496 struct read_packet_state {
497         int fd;
498         uint8_t *buf;
499         size_t nread;
500         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
501         void *private_data;
502 };
503
504 static void read_packet_handler(struct tevent_context *ev,
505                                 struct tevent_fd *fde,
506                                 uint16_t flags, void *private_data);
507
508 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
509                                     struct tevent_context *ev,
510                                     int fd, size_t initial,
511                                     ssize_t (*more)(uint8_t *buf,
512                                                     size_t buflen,
513                                                     void *private_data),
514                                     void *private_data)
515 {
516         struct tevent_req *result;
517         struct read_packet_state *state;
518         struct tevent_fd *fde;
519
520         result = tevent_req_create(mem_ctx, &state, struct read_packet_state);
521         if (result == NULL) {
522                 return NULL;
523         }
524         state->fd = fd;
525         state->nread = 0;
526         state->more = more;
527         state->private_data = private_data;
528
529         state->buf = talloc_array(state, uint8_t, initial);
530         if (state->buf == NULL) {
531                 goto fail;
532         }
533
534         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, read_packet_handler,
535                             result);
536         if (fde == NULL) {
537                 goto fail;
538         }
539         return result;
540  fail:
541         TALLOC_FREE(result);
542         return NULL;
543 }
544
545 static void read_packet_handler(struct tevent_context *ev,
546                                 struct tevent_fd *fde,
547                                 uint16_t flags, void *private_data)
548 {
549         struct tevent_req *req = talloc_get_type_abort(
550                 private_data, struct tevent_req);
551         struct read_packet_state *state =
552                 tevent_req_data(req, struct read_packet_state);
553         size_t total = talloc_get_size(state->buf);
554         ssize_t nread, more;
555         uint8_t *tmp;
556
557         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
558                      0);
559         if (nread == -1) {
560                 tevent_req_error(req, errno);
561                 return;
562         }
563         if (nread == 0) {
564                 tevent_req_error(req, EPIPE);
565                 return;
566         }
567
568         state->nread += nread;
569         if (state->nread < total) {
570                 /* Come back later */
571                 return;
572         }
573
574         /*
575          * We got what was initially requested. See if "more" asks for -- more.
576          */
577         if (state->more == NULL) {
578                 /* Nobody to ask, this is a async read_data */
579                 tevent_req_done(req);
580                 return;
581         }
582
583         more = state->more(state->buf, total, state->private_data);
584         if (more == -1) {
585                 /* We got an invalid packet, tell the caller */
586                 tevent_req_error(req, EIO);
587                 return;
588         }
589         if (more == 0) {
590                 /* We're done, full packet received */
591                 tevent_req_done(req);
592                 return;
593         }
594
595         tmp = TALLOC_REALLOC_ARRAY(state, state->buf, uint8_t, total+more);
596         if (tevent_req_nomem(tmp, req)) {
597                 return;
598         }
599         state->buf = tmp;
600 }
601
602 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
603                          uint8_t **pbuf, int *perrno)
604 {
605         struct read_packet_state *state =
606                 tevent_req_data(req, struct read_packet_state);
607
608         if (tevent_req_is_unix_error(req, perrno)) {
609                 return -1;
610         }
611         *pbuf = talloc_move(mem_ctx, &state->buf);
612         return talloc_get_size(*pbuf);
613 }
614
615 struct recvfrom_state {
616         int fd;
617         void *buf;
618         size_t len;
619         int flags;
620         struct sockaddr_storage *addr;
621         socklen_t *addr_len;
622         ssize_t received;
623 };
624
625 static void recvfrom_handler(struct tevent_context *ev,
626                                struct tevent_fd *fde,
627                                uint16_t flags, void *private_data);
628
629 struct tevent_req *recvfrom_send(TALLOC_CTX *mem_ctx,
630                                  struct tevent_context *ev,
631                                  int fd, void *buf, size_t len, int flags,
632                                  struct sockaddr_storage *addr,
633                                  socklen_t *addr_len)
634 {
635         struct tevent_req *result;
636         struct recvfrom_state *state;
637         struct tevent_fd *fde;
638
639         result = tevent_req_create(mem_ctx, &state, struct recvfrom_state);
640         if (result == NULL) {
641                 return result;
642         }
643         state->fd = fd;
644         state->buf = buf;
645         state->len = len;
646         state->flags = flags;
647         state->addr = addr;
648         state->addr_len = addr_len;
649
650         fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ, recvfrom_handler,
651                             result);
652         if (fde == NULL) {
653                 TALLOC_FREE(result);
654                 return NULL;
655         }
656         return result;
657 }
658
659 static void recvfrom_handler(struct tevent_context *ev,
660                                struct tevent_fd *fde,
661                                uint16_t flags, void *private_data)
662 {
663         struct tevent_req *req = talloc_get_type_abort(
664                 private_data, struct tevent_req);
665         struct recvfrom_state *state =
666                 tevent_req_data(req, struct recvfrom_state);
667
668         state->received = recvfrom(state->fd, state->buf, state->len,
669                                    state->flags, (struct sockaddr *)state->addr,
670                                    state->addr_len);
671         if ((state->received == -1) && (errno == EINTR)) {
672                 /* retry */
673                 return;
674         }
675         if (state->received == 0) {
676                 tevent_req_error(req, EPIPE);
677                 return;
678         }
679         if (state->received == -1) {
680                 tevent_req_error(req, errno);
681                 return;
682         }
683         tevent_req_done(req);
684 }
685
686 ssize_t recvfrom_recv(struct tevent_req *req, int *perrno)
687 {
688         struct recvfrom_state *state =
689                 tevent_req_data(req, struct recvfrom_state);
690
691         if (tevent_req_is_unix_error(req, perrno)) {
692                 return -1;
693         }
694         return state->received;
695 }