lib: Use optimize_empty in writev_send()
[samba.git] / lib / async_req / async_sock.c
1 /*
2    Unix SMB/CIFS implementation.
3    async socket syscalls
4    Copyright (C) Volker Lendecke 2008
5
6      ** NOTE! The following LGPL license applies to the async_sock
7      ** library. This does NOT imply that all of Samba is released
8      ** under the LGPL
9
10    This library is free software; you can redistribute it and/or
11    modify it under the terms of the GNU Lesser General Public
12    License as published by the Free Software Foundation; either
13    version 3 of the License, or (at your option) any later version.
14
15    This library is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
18    Library General Public License for more details.
19
20    You should have received a copy of the GNU Lesser General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23
24 #include "replace.h"
25 #include "system/network.h"
26 #include "system/filesys.h"
27 #include <talloc.h>
28 #include <tevent.h>
29 #include "lib/async_req/async_sock.h"
30 #include "lib/util/iov_buf.h"
31
32 /* Note: lib/util/ is currently GPL */
33 #include "lib/util/tevent_unix.h"
34 #include "lib/util/samba_util.h"
35
36 struct async_connect_state {
37         int fd;
38         struct tevent_fd *fde;
39         int result;
40         long old_sockflags;
41         socklen_t address_len;
42         struct sockaddr_storage address;
43
44         void (*before_connect)(void *private_data);
45         void (*after_connect)(void *private_data);
46         void *private_data;
47 };
48
49 static void async_connect_cleanup(struct tevent_req *req,
50                                   enum tevent_req_state req_state);
51 static void async_connect_connected(struct tevent_context *ev,
52                                     struct tevent_fd *fde, uint16_t flags,
53                                     void *priv);
54
55 /**
56  * @brief async version of connect(2)
57  * @param[in] mem_ctx   The memory context to hang the result off
58  * @param[in] ev        The event context to work from
59  * @param[in] fd        The socket to recv from
60  * @param[in] address   Where to connect?
61  * @param[in] address_len Length of *address
62  * @retval The async request
63  *
64  * This function sets the socket into non-blocking state to be able to call
65  * connect in an async state. This will be reset when the request is finished.
66  */
67
68 struct tevent_req *async_connect_send(
69         TALLOC_CTX *mem_ctx, struct tevent_context *ev, int fd,
70         const struct sockaddr *address, socklen_t address_len,
71         void (*before_connect)(void *private_data),
72         void (*after_connect)(void *private_data),
73         void *private_data)
74 {
75         struct tevent_req *req;
76         struct async_connect_state *state;
77         int ret;
78
79         req = tevent_req_create(mem_ctx, &state, struct async_connect_state);
80         if (req == NULL) {
81                 return NULL;
82         }
83
84         /**
85          * We have to set the socket to nonblocking for async connect(2). Keep
86          * the old sockflags around.
87          */
88
89         state->fd = fd;
90         state->before_connect = before_connect;
91         state->after_connect = after_connect;
92         state->private_data = private_data;
93
94         state->old_sockflags = fcntl(fd, F_GETFL, 0);
95         if (state->old_sockflags == -1) {
96                 tevent_req_error(req, errno);
97                 return tevent_req_post(req, ev);
98         }
99
100         tevent_req_set_cleanup_fn(req, async_connect_cleanup);
101
102         state->address_len = address_len;
103         if (address_len > sizeof(state->address)) {
104                 tevent_req_error(req, EINVAL);
105                 return tevent_req_post(req, ev);
106         }
107         memcpy(&state->address, address, address_len);
108
109         ret = set_blocking(fd, false);
110         if (ret == -1) {
111                 tevent_req_error(req, errno);
112                 return tevent_req_post(req, ev);
113         }
114
115         if (state->before_connect != NULL) {
116                 state->before_connect(state->private_data);
117         }
118
119         state->result = connect(fd, address, address_len);
120
121         if (state->after_connect != NULL) {
122                 state->after_connect(state->private_data);
123         }
124
125         if (state->result == 0) {
126                 tevent_req_done(req);
127                 return tevent_req_post(req, ev);
128         }
129
130         /*
131          * The only errno indicating that an initial connect is still
132          * in flight is EINPROGRESS.
133          *
134          * This allows callers like open_socket_out_send() to reuse
135          * fds and call us with an fd for which the connect is still
136          * in flight. The proper thing to do for callers would be
137          * closing the fd and starting from scratch with a fresh
138          * socket.
139          */
140
141         if (errno != EINPROGRESS) {
142                 tevent_req_error(req, errno);
143                 return tevent_req_post(req, ev);
144         }
145
146         /*
147          * Note for historic reasons TEVENT_FD_WRITE is not enough
148          * to get notified for POLLERR or EPOLLHUP even if they
149          * come together with POLLOUT. That means we need to
150          * use TEVENT_FD_READ in addition until we have
151          * TEVENT_FD_ERROR.
152          */
153         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ|TEVENT_FD_WRITE,
154                                    async_connect_connected, req);
155         if (state->fde == NULL) {
156                 tevent_req_error(req, ENOMEM);
157                 return tevent_req_post(req, ev);
158         }
159         return req;
160 }
161
162 static void async_connect_cleanup(struct tevent_req *req,
163                                   enum tevent_req_state req_state)
164 {
165         struct async_connect_state *state =
166                 tevent_req_data(req, struct async_connect_state);
167
168         TALLOC_FREE(state->fde);
169         if (state->fd != -1) {
170                 int ret;
171
172                 ret = fcntl(state->fd, F_SETFL, state->old_sockflags);
173                 if (ret == -1) {
174                         abort();
175                 }
176
177                 state->fd = -1;
178         }
179 }
180
181 /**
182  * fde event handler for connect(2)
183  * @param[in] ev        The event context that sent us here
184  * @param[in] fde       The file descriptor event associated with the connect
185  * @param[in] flags     Indicate read/writeability of the socket
186  * @param[in] priv      private data, "struct async_req *" in this case
187  */
188
189 static void async_connect_connected(struct tevent_context *ev,
190                                     struct tevent_fd *fde, uint16_t flags,
191                                     void *priv)
192 {
193         struct tevent_req *req = talloc_get_type_abort(
194                 priv, struct tevent_req);
195         struct async_connect_state *state =
196                 tevent_req_data(req, struct async_connect_state);
197         int ret;
198         int socket_error = 0;
199         socklen_t slen = sizeof(socket_error);
200
201         ret = getsockopt(state->fd, SOL_SOCKET, SO_ERROR,
202                          &socket_error, &slen);
203
204         if (ret != 0) {
205                 /*
206                  * According to Stevens this is the Solaris behaviour
207                  * in case the connection encountered an error:
208                  * getsockopt() fails, error is in errno
209                  */
210                 tevent_req_error(req, errno);
211                 return;
212         }
213
214         if (socket_error != 0) {
215                 /*
216                  * Berkeley derived implementations (including) Linux
217                  * return the pending error via socket_error.
218                  */
219                 tevent_req_error(req, socket_error);
220                 return;
221         }
222
223         tevent_req_done(req);
224         return;
225 }
226
227 int async_connect_recv(struct tevent_req *req, int *perrno)
228 {
229         int err = tevent_req_simple_recv_unix(req);
230
231         if (err != 0) {
232                 *perrno = err;
233                 return -1;
234         }
235
236         return 0;
237 }
238
239 struct writev_state {
240         struct tevent_context *ev;
241         struct tevent_queue_entry *queue_entry;
242         int fd;
243         struct tevent_fd *fde;
244         struct iovec *iov;
245         int count;
246         size_t total_size;
247         uint16_t flags;
248         bool err_on_readability;
249 };
250
251 static void writev_cleanup(struct tevent_req *req,
252                            enum tevent_req_state req_state);
253 static bool writev_cancel(struct tevent_req *req);
254 static void writev_trigger(struct tevent_req *req, void *private_data);
255 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
256                            uint16_t flags, void *private_data);
257
258 struct tevent_req *writev_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
259                                struct tevent_queue *queue, int fd,
260                                bool err_on_readability,
261                                struct iovec *iov, int count)
262 {
263         struct tevent_req *req;
264         struct writev_state *state;
265
266         req = tevent_req_create(mem_ctx, &state, struct writev_state);
267         if (req == NULL) {
268                 return NULL;
269         }
270         state->ev = ev;
271         state->fd = fd;
272         state->total_size = 0;
273         state->count = count;
274         state->iov = (struct iovec *)talloc_memdup(
275                 state, iov, sizeof(struct iovec) * count);
276         if (tevent_req_nomem(state->iov, req)) {
277                 return tevent_req_post(req, ev);
278         }
279         state->flags = TEVENT_FD_WRITE|TEVENT_FD_READ;
280         state->err_on_readability = err_on_readability;
281
282         tevent_req_set_cleanup_fn(req, writev_cleanup);
283         tevent_req_set_cancel_fn(req, writev_cancel);
284
285         if (queue == NULL) {
286                 state->fde = tevent_add_fd(state->ev, state, state->fd,
287                                     state->flags, writev_handler, req);
288                 if (tevent_req_nomem(state->fde, req)) {
289                         return tevent_req_post(req, ev);
290                 }
291                 return req;
292         }
293
294         state->queue_entry = tevent_queue_add_optimize_empty(
295                 queue, ev, req, writev_trigger, NULL);
296         if (tevent_req_nomem(state->queue_entry, req)) {
297                 return tevent_req_post(req, ev);
298         }
299         return req;
300 }
301
302 static void writev_cleanup(struct tevent_req *req,
303                            enum tevent_req_state req_state)
304 {
305         struct writev_state *state = tevent_req_data(req, struct writev_state);
306
307         TALLOC_FREE(state->queue_entry);
308         TALLOC_FREE(state->fde);
309 }
310
311 static bool writev_cancel(struct tevent_req *req)
312 {
313         struct writev_state *state = tevent_req_data(req, struct writev_state);
314
315         if (state->total_size > 0) {
316                 /*
317                  * We've already started to write :-(
318                  */
319                 return false;
320         }
321
322         TALLOC_FREE(state->queue_entry);
323         TALLOC_FREE(state->fde);
324
325         tevent_req_defer_callback(req, state->ev);
326         tevent_req_error(req, ECANCELED);
327         return true;
328 }
329
330 static void writev_trigger(struct tevent_req *req, void *private_data)
331 {
332         struct writev_state *state = tevent_req_data(req, struct writev_state);
333
334         state->queue_entry = NULL;
335
336         state->fde = tevent_add_fd(state->ev, state, state->fd, state->flags,
337                             writev_handler, req);
338         if (tevent_req_nomem(state->fde, req)) {
339                 return;
340         }
341 }
342
343 static void writev_handler(struct tevent_context *ev, struct tevent_fd *fde,
344                            uint16_t flags, void *private_data)
345 {
346         struct tevent_req *req = talloc_get_type_abort(
347                 private_data, struct tevent_req);
348         struct writev_state *state =
349                 tevent_req_data(req, struct writev_state);
350         ssize_t written;
351         bool ok;
352
353         if ((state->flags & TEVENT_FD_READ) && (flags & TEVENT_FD_READ)) {
354                 int ret, value;
355
356                 if (state->err_on_readability) {
357                         /* Readable and the caller wants an error on read. */
358                         tevent_req_error(req, EPIPE);
359                         return;
360                 }
361
362                 /* Might be an error. Check if there are bytes to read */
363                 ret = ioctl(state->fd, FIONREAD, &value);
364                 /* FIXME - should we also check
365                    for ret == 0 and value == 0 here ? */
366                 if (ret == -1) {
367                         /* There's an error. */
368                         tevent_req_error(req, EPIPE);
369                         return;
370                 }
371                 /* A request for TEVENT_FD_READ will succeed from now and
372                    forevermore until the bytes are read so if there was
373                    an error we'll wait until we do read, then get it in
374                    the read callback function. Until then, remove TEVENT_FD_READ
375                    from the flags we're waiting for. */
376                 state->flags &= ~TEVENT_FD_READ;
377                 TEVENT_FD_NOT_READABLE(fde);
378
379                 /* If not writable, we're done. */
380                 if (!(flags & TEVENT_FD_WRITE)) {
381                         return;
382                 }
383         }
384
385         written = writev(state->fd, state->iov, state->count);
386         if ((written == -1) && (errno == EINTR)) {
387                 /* retry */
388                 return;
389         }
390         if (written == -1) {
391                 tevent_req_error(req, errno);
392                 return;
393         }
394         if (written == 0) {
395                 tevent_req_error(req, EPIPE);
396                 return;
397         }
398         state->total_size += written;
399
400         ok = iov_advance(&state->iov, &state->count, written);
401         if (!ok) {
402                 tevent_req_error(req, EIO);
403                 return;
404         }
405
406         if (state->count == 0) {
407                 tevent_req_done(req);
408                 return;
409         }
410 }
411
412 ssize_t writev_recv(struct tevent_req *req, int *perrno)
413 {
414         struct writev_state *state =
415                 tevent_req_data(req, struct writev_state);
416         ssize_t ret;
417
418         if (tevent_req_is_unix_error(req, perrno)) {
419                 tevent_req_received(req);
420                 return -1;
421         }
422         ret = state->total_size;
423         tevent_req_received(req);
424         return ret;
425 }
426
427 struct read_packet_state {
428         int fd;
429         struct tevent_fd *fde;
430         uint8_t *buf;
431         size_t nread;
432         ssize_t (*more)(uint8_t *buf, size_t buflen, void *private_data);
433         void *private_data;
434 };
435
436 static void read_packet_cleanup(struct tevent_req *req,
437                                  enum tevent_req_state req_state);
438 static void read_packet_handler(struct tevent_context *ev,
439                                 struct tevent_fd *fde,
440                                 uint16_t flags, void *private_data);
441
442 struct tevent_req *read_packet_send(TALLOC_CTX *mem_ctx,
443                                     struct tevent_context *ev,
444                                     int fd, size_t initial,
445                                     ssize_t (*more)(uint8_t *buf,
446                                                     size_t buflen,
447                                                     void *private_data),
448                                     void *private_data)
449 {
450         struct tevent_req *req;
451         struct read_packet_state *state;
452
453         req = tevent_req_create(mem_ctx, &state, struct read_packet_state);
454         if (req == NULL) {
455                 return NULL;
456         }
457         state->fd = fd;
458         state->nread = 0;
459         state->more = more;
460         state->private_data = private_data;
461
462         tevent_req_set_cleanup_fn(req, read_packet_cleanup);
463
464         state->buf = talloc_array(state, uint8_t, initial);
465         if (tevent_req_nomem(state->buf, req)) {
466                 return tevent_req_post(req, ev);
467         }
468
469         state->fde = tevent_add_fd(ev, state, fd,
470                                    TEVENT_FD_READ, read_packet_handler,
471                                    req);
472         if (tevent_req_nomem(state->fde, req)) {
473                 return tevent_req_post(req, ev);
474         }
475         return req;
476 }
477
478 static void read_packet_cleanup(struct tevent_req *req,
479                            enum tevent_req_state req_state)
480 {
481         struct read_packet_state *state =
482                 tevent_req_data(req, struct read_packet_state);
483
484         TALLOC_FREE(state->fde);
485 }
486
487 static void read_packet_handler(struct tevent_context *ev,
488                                 struct tevent_fd *fde,
489                                 uint16_t flags, void *private_data)
490 {
491         struct tevent_req *req = talloc_get_type_abort(
492                 private_data, struct tevent_req);
493         struct read_packet_state *state =
494                 tevent_req_data(req, struct read_packet_state);
495         size_t total = talloc_get_size(state->buf);
496         ssize_t nread, more;
497         uint8_t *tmp;
498
499         nread = recv(state->fd, state->buf+state->nread, total-state->nread,
500                      0);
501         if ((nread == -1) && (errno == ENOTSOCK)) {
502                 nread = read(state->fd, state->buf+state->nread,
503                              total-state->nread);
504         }
505         if ((nread == -1) && (errno == EINTR)) {
506                 /* retry */
507                 return;
508         }
509         if (nread == -1) {
510                 tevent_req_error(req, errno);
511                 return;
512         }
513         if (nread == 0) {
514                 tevent_req_error(req, EPIPE);
515                 return;
516         }
517
518         state->nread += nread;
519         if (state->nread < total) {
520                 /* Come back later */
521                 return;
522         }
523
524         /*
525          * We got what was initially requested. See if "more" asks for -- more.
526          */
527         if (state->more == NULL) {
528                 /* Nobody to ask, this is a async read_data */
529                 tevent_req_done(req);
530                 return;
531         }
532
533         more = state->more(state->buf, total, state->private_data);
534         if (more == -1) {
535                 /* We got an invalid packet, tell the caller */
536                 tevent_req_error(req, EIO);
537                 return;
538         }
539         if (more == 0) {
540                 /* We're done, full packet received */
541                 tevent_req_done(req);
542                 return;
543         }
544
545         if (total + more < total) {
546                 tevent_req_error(req, EMSGSIZE);
547                 return;
548         }
549
550         tmp = talloc_realloc(state, state->buf, uint8_t, total+more);
551         if (tevent_req_nomem(tmp, req)) {
552                 return;
553         }
554         state->buf = tmp;
555 }
556
557 ssize_t read_packet_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
558                          uint8_t **pbuf, int *perrno)
559 {
560         struct read_packet_state *state =
561                 tevent_req_data(req, struct read_packet_state);
562
563         if (tevent_req_is_unix_error(req, perrno)) {
564                 tevent_req_received(req);
565                 return -1;
566         }
567         *pbuf = talloc_move(mem_ctx, &state->buf);
568         tevent_req_received(req);
569         return talloc_get_size(*pbuf);
570 }
571
572 struct wait_for_read_state {
573         struct tevent_fd *fde;
574         int fd;
575         bool check_errors;
576 };
577
578 static void wait_for_read_cleanup(struct tevent_req *req,
579                                   enum tevent_req_state req_state);
580 static void wait_for_read_done(struct tevent_context *ev,
581                                struct tevent_fd *fde,
582                                uint16_t flags,
583                                void *private_data);
584
585 struct tevent_req *wait_for_read_send(TALLOC_CTX *mem_ctx,
586                                       struct tevent_context *ev, int fd,
587                                       bool check_errors)
588 {
589         struct tevent_req *req;
590         struct wait_for_read_state *state;
591
592         req = tevent_req_create(mem_ctx, &state, struct wait_for_read_state);
593         if (req == NULL) {
594                 return NULL;
595         }
596
597         tevent_req_set_cleanup_fn(req, wait_for_read_cleanup);
598
599         state->fde = tevent_add_fd(ev, state, fd, TEVENT_FD_READ,
600                                    wait_for_read_done, req);
601         if (tevent_req_nomem(state->fde, req)) {
602                 return tevent_req_post(req, ev);
603         }
604
605         state->fd = fd;
606         state->check_errors = check_errors;
607         return req;
608 }
609
610 static void wait_for_read_cleanup(struct tevent_req *req,
611                                   enum tevent_req_state req_state)
612 {
613         struct wait_for_read_state *state =
614                 tevent_req_data(req, struct wait_for_read_state);
615
616         TALLOC_FREE(state->fde);
617 }
618
619 static void wait_for_read_done(struct tevent_context *ev,
620                                struct tevent_fd *fde,
621                                uint16_t flags,
622                                void *private_data)
623 {
624         struct tevent_req *req = talloc_get_type_abort(
625                 private_data, struct tevent_req);
626         struct wait_for_read_state *state =
627             tevent_req_data(req, struct wait_for_read_state);
628         ssize_t nread;
629         char c;
630
631         if ((flags & TEVENT_FD_READ) == 0) {
632                 return;
633         }
634
635         if (!state->check_errors) {
636                 tevent_req_done(req);
637                 return;
638         }
639
640         nread = recv(state->fd, &c, 1, MSG_PEEK);
641
642         if (nread == 0) {
643                 tevent_req_error(req, EPIPE);
644                 return;
645         }
646
647         if ((nread == -1) && (errno == EINTR)) {
648                 /* come back later */
649                 return;
650         }
651
652         if ((nread == -1) && (errno == ENOTSOCK)) {
653                 /* Ignore this specific error on pipes */
654                 tevent_req_done(req);
655                 return;
656         }
657
658         if (nread == -1) {
659                 tevent_req_error(req, errno);
660                 return;
661         }
662
663         tevent_req_done(req);
664 }
665
666 bool wait_for_read_recv(struct tevent_req *req, int *perr)
667 {
668         int err = tevent_req_simple_recv_unix(req);
669
670         if (err != 0) {
671                 *perr = err;
672                 return false;
673         }
674
675         return true;
676 }
677
678 struct accept_state {
679         struct tevent_fd *fde;
680         int listen_sock;
681         socklen_t addrlen;
682         struct sockaddr_storage addr;
683         int sock;
684 };
685
686 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
687                            uint16_t flags, void *private_data);
688
689 struct tevent_req *accept_send(TALLOC_CTX *mem_ctx, struct tevent_context *ev,
690                                int listen_sock)
691 {
692         struct tevent_req *req;
693         struct accept_state *state;
694
695         req = tevent_req_create(mem_ctx, &state, struct accept_state);
696         if (req == NULL) {
697                 return NULL;
698         }
699
700         state->listen_sock = listen_sock;
701
702         state->fde = tevent_add_fd(ev, state, listen_sock, TEVENT_FD_READ,
703                                    accept_handler, req);
704         if (tevent_req_nomem(state->fde, req)) {
705                 return tevent_req_post(req, ev);
706         }
707         return req;
708 }
709
710 static void accept_handler(struct tevent_context *ev, struct tevent_fd *fde,
711                            uint16_t flags, void *private_data)
712 {
713         struct tevent_req *req = talloc_get_type_abort(
714                 private_data, struct tevent_req);
715         struct accept_state *state = tevent_req_data(req, struct accept_state);
716         int ret;
717
718         TALLOC_FREE(state->fde);
719
720         if ((flags & TEVENT_FD_READ) == 0) {
721                 tevent_req_error(req, EIO);
722                 return;
723         }
724         state->addrlen = sizeof(state->addr);
725
726         ret = accept(state->listen_sock, (struct sockaddr *)&state->addr,
727                      &state->addrlen);
728         if ((ret == -1) && (errno == EINTR)) {
729                 /* retry */
730                 return;
731         }
732         if (ret == -1) {
733                 tevent_req_error(req, errno);
734                 return;
735         }
736         smb_set_close_on_exec(ret);
737         state->sock = ret;
738         tevent_req_done(req);
739 }
740
741 int accept_recv(struct tevent_req *req, struct sockaddr_storage *paddr,
742                 socklen_t *paddrlen, int *perr)
743 {
744         struct accept_state *state = tevent_req_data(req, struct accept_state);
745         int err;
746
747         if (tevent_req_is_unix_error(req, &err)) {
748                 if (perr != NULL) {
749                         *perr = err;
750                 }
751                 return -1;
752         }
753         if (paddr != NULL) {
754                 memcpy(paddr, &state->addr, state->addrlen);
755         }
756         if (paddrlen != NULL) {
757                 *paddrlen = state->addrlen;
758         }
759         return state->sock;
760 }