s3-vfs: Don't leak file descriptor.
[samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29
30 #undef recvmsg
31
32 #ifndef MAP_FILE
33 #define MAP_FILE 0
34 #endif
35
36 struct aio_fork_config {
37         bool erratic_testing_mode;
38 };
39
40 struct mmap_area {
41         size_t size;
42         volatile void *ptr;
43 };
44
45 static int mmap_area_destructor(struct mmap_area *area)
46 {
47         munmap((void *)area->ptr, area->size);
48         return 0;
49 }
50
51 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
52 {
53         struct mmap_area *result;
54         int fd;
55
56         result = talloc(mem_ctx, struct mmap_area);
57         if (result == NULL) {
58                 DEBUG(0, ("talloc failed\n"));
59                 goto fail;
60         }
61
62         fd = open("/dev/zero", O_RDWR);
63         if (fd == -1) {
64                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
65                           strerror(errno)));
66                 goto fail;
67         }
68
69         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
70                            MAP_SHARED|MAP_FILE, fd, 0);
71         close(fd);
72         if (result->ptr == MAP_FAILED) {
73                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
74                 goto fail;
75         }
76
77         result->size = size;
78         talloc_set_destructor(result, mmap_area_destructor);
79
80         return result;
81
82 fail:
83         TALLOC_FREE(result);
84         return NULL;
85 }
86
87 enum cmd_type {
88         READ_CMD,
89         WRITE_CMD,
90         FSYNC_CMD
91 };
92
93 static const char *cmd_type_str(enum cmd_type cmd)
94 {
95         const char *result;
96
97         switch (cmd) {
98         case READ_CMD:
99                 result = "READ";
100                 break;
101         case WRITE_CMD:
102                 result = "WRITE";
103                 break;
104         case FSYNC_CMD:
105                 result = "FSYNC";
106                 break;
107         default:
108                 result = "<UNKNOWN>";
109                 break;
110         }
111         return result;
112 }
113
114 struct rw_cmd {
115         size_t n;
116         off_t offset;
117         enum cmd_type cmd;
118         bool erratic_testing_mode;
119 };
120
121 struct rw_ret {
122         ssize_t size;
123         int ret_errno;
124 };
125
126 struct aio_child_list;
127
128 struct aio_child {
129         struct aio_child *prev, *next;
130         struct aio_child_list *list;
131         pid_t pid;
132         int sockfd;
133         struct mmap_area *map;
134         bool dont_delete;       /* Marked as in use since last cleanup */
135         bool busy;
136 };
137
138 struct aio_child_list {
139         struct aio_child *children;
140         struct tevent_timer *cleanup_event;
141 };
142
143 static void free_aio_children(void **p)
144 {
145         TALLOC_FREE(*p);
146 }
147
148 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
149 {
150         struct msghdr msg;
151         struct iovec iov[1];
152         ssize_t n;
153 #ifndef HAVE_MSGHDR_MSG_CONTROL
154         int newfd;
155 #endif
156
157 #ifdef  HAVE_MSGHDR_MSG_CONTROL
158         union {
159           struct cmsghdr        cm;
160           char                          control[CMSG_SPACE(sizeof(int))];
161         } control_un;
162         struct cmsghdr  *cmptr;
163
164         msg.msg_control = control_un.control;
165         msg.msg_controllen = sizeof(control_un.control);
166 #else
167 #if HAVE_MSGHDR_MSG_ACCTRIGHTS
168         msg.msg_accrights = (caddr_t) &newfd;
169         msg.msg_accrightslen = sizeof(int);
170 #else
171 #error Can not pass file descriptors
172 #endif
173 #endif
174
175         msg.msg_name = NULL;
176         msg.msg_namelen = 0;
177         msg.msg_flags = 0;
178
179         iov[0].iov_base = (void *)ptr;
180         iov[0].iov_len = nbytes;
181         msg.msg_iov = iov;
182         msg.msg_iovlen = 1;
183
184         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
185                 return(n);
186         }
187
188 #ifdef  HAVE_MSGHDR_MSG_CONTROL
189         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
190             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
191                 if (cmptr->cmsg_level != SOL_SOCKET) {
192                         DEBUG(10, ("control level != SOL_SOCKET"));
193                         errno = EINVAL;
194                         return -1;
195                 }
196                 if (cmptr->cmsg_type != SCM_RIGHTS) {
197                         DEBUG(10, ("control type != SCM_RIGHTS"));
198                         errno = EINVAL;
199                         return -1;
200                 }
201                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
202         } else {
203                 *recvfd = -1;           /* descriptor was not passed */
204         }
205 #else
206         if (msg.msg_accrightslen == sizeof(int)) {
207                 *recvfd = newfd;
208         }
209         else {
210                 *recvfd = -1;           /* descriptor was not passed */
211         }
212 #endif
213
214         return(n);
215 }
216
217 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
218 {
219         struct msghdr   msg;
220         struct iovec    iov[1];
221
222 #ifdef  HAVE_MSGHDR_MSG_CONTROL
223         union {
224                 struct cmsghdr  cm;
225                 char control[CMSG_SPACE(sizeof(int))];
226         } control_un;
227         struct cmsghdr  *cmptr;
228
229         ZERO_STRUCT(msg);
230         ZERO_STRUCT(control_un);
231
232         msg.msg_control = control_un.control;
233         msg.msg_controllen = sizeof(control_un.control);
234
235         cmptr = CMSG_FIRSTHDR(&msg);
236         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
237         cmptr->cmsg_level = SOL_SOCKET;
238         cmptr->cmsg_type = SCM_RIGHTS;
239         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
240 #else
241         ZERO_STRUCT(msg);
242         msg.msg_accrights = (caddr_t) &sendfd;
243         msg.msg_accrightslen = sizeof(int);
244 #endif
245
246         msg.msg_name = NULL;
247         msg.msg_namelen = 0;
248
249         ZERO_STRUCT(iov);
250         iov[0].iov_base = (void *)ptr;
251         iov[0].iov_len = nbytes;
252         msg.msg_iov = iov;
253         msg.msg_iovlen = 1;
254
255         return (sendmsg(fd, &msg, 0));
256 }
257
258 static void aio_child_cleanup(struct tevent_context *event_ctx,
259                               struct tevent_timer *te,
260                               struct timeval now,
261                               void *private_data)
262 {
263         struct aio_child_list *list = talloc_get_type_abort(
264                 private_data, struct aio_child_list);
265         struct aio_child *child, *next;
266
267         TALLOC_FREE(list->cleanup_event);
268
269         for (child = list->children; child != NULL; child = next) {
270                 next = child->next;
271
272                 if (child->busy) {
273                         DEBUG(10, ("child %d currently active\n",
274                                    (int)child->pid));
275                         continue;
276                 }
277
278                 if (child->dont_delete) {
279                         DEBUG(10, ("Child %d was active since last cleanup\n",
280                                    (int)child->pid));
281                         child->dont_delete = false;
282                         continue;
283                 }
284
285                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
286                            "deleting\n", (int)child->pid));
287
288                 TALLOC_FREE(child);
289                 child = next;
290         }
291
292         if (list->children != NULL) {
293                 /*
294                  * Re-schedule the next cleanup round
295                  */
296                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
297                                                       timeval_add(&now, 30, 0),
298                                                       aio_child_cleanup, list);
299
300         }
301 }
302
303 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
304 {
305         struct aio_child_list *data = NULL;
306
307         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
308                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
309                                         return NULL);
310         }
311
312         if (data == NULL) {
313                 data = talloc_zero(NULL, struct aio_child_list);
314                 if (data == NULL) {
315                         return NULL;
316                 }
317         }
318
319         /*
320          * Regardless of whether the child_list had been around or not, make
321          * sure that we have a cleanup timed event. This timed event will
322          * delete itself when it finds that no children are around anymore.
323          */
324
325         if (data->cleanup_event == NULL) {
326                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
327                                                       timeval_current_ofs(30, 0),
328                                                       aio_child_cleanup, data);
329                 if (data->cleanup_event == NULL) {
330                         TALLOC_FREE(data);
331                         return NULL;
332                 }
333         }
334
335         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
336                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
337                                         struct aio_child_list, return False);
338         }
339
340         return data;
341 }
342
343 static void aio_child_loop(int sockfd, struct mmap_area *map)
344 {
345         while (true) {
346                 int fd = -1;
347                 ssize_t ret;
348                 struct rw_cmd cmd_struct;
349                 struct rw_ret ret_struct;
350
351                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
352                 if (ret != sizeof(cmd_struct)) {
353                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
354                                    strerror(errno)));
355                         exit(1);
356                 }
357
358                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
359                            cmd_type_str(cmd_struct.cmd),
360                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
361
362                 if (cmd_struct.erratic_testing_mode) {
363                         /*
364                          * For developer testing, we want erratic behaviour for
365                          * async I/O times
366                          */
367                         uint8_t randval;
368                         unsigned msecs;
369                         /*
370                          * use generate_random_buffer, we just forked from a
371                          * common parent state
372                          */
373                         generate_random_buffer(&randval, sizeof(randval));
374                         msecs = randval + 20;
375                         DEBUG(10, ("delaying for %u msecs\n", msecs));
376                         smb_msleep(msecs);
377                 }
378
379                 ZERO_STRUCT(ret_struct);
380
381                 switch (cmd_struct.cmd) {
382                 case READ_CMD:
383                         ret_struct.size = sys_pread(
384                                 fd, (void *)map->ptr, cmd_struct.n,
385                                 cmd_struct.offset);
386 #if 0
387 /* This breaks "make test" when run with aio_fork module. */
388 #ifdef DEVELOPER
389                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
390 #endif
391 #endif
392                         break;
393                 case WRITE_CMD:
394                         ret_struct.size = sys_pwrite(
395                                 fd, (void *)map->ptr, cmd_struct.n,
396                                 cmd_struct.offset);
397                         break;
398                 case FSYNC_CMD:
399                         ret_struct.size = fsync(fd);
400                         break;
401                 default:
402                         ret_struct.size = -1;
403                         errno = EINVAL;
404                 }
405
406                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
407                            (int)ret_struct.size));
408
409                 if (ret_struct.size == -1) {
410                         ret_struct.ret_errno = errno;
411                 }
412
413                 /*
414                  * Close the fd before telling our parent we're done. The
415                  * parent might close and re-open the file very quickly, and
416                  * with system-level share modes (GPFS) we would get an
417                  * unjustified SHARING_VIOLATION.
418                  */
419                 close(fd);
420
421                 ret = write_data(sockfd, (char *)&ret_struct,
422                                  sizeof(ret_struct));
423                 if (ret != sizeof(ret_struct)) {
424                         DEBUG(10, ("could not write ret_struct: %s\n",
425                                    strerror(errno)));
426                         exit(2);
427                 }
428         }
429 }
430
431 static int aio_child_destructor(struct aio_child *child)
432 {
433         char c=0;
434
435         SMB_ASSERT(!child->busy);
436
437         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
438                         child->pid, child->sockfd));
439
440         /*
441          * closing the sockfd makes the child not return from recvmsg() on RHEL
442          * 5.5 so instead force the child to exit by writing bad data to it
443          */
444         write(child->sockfd, &c, sizeof(c));
445         close(child->sockfd);
446         DLIST_REMOVE(child->list->children, child);
447         return 0;
448 }
449
450 /*
451  * We have to close all fd's in open files, we might incorrectly hold a system
452  * level share mode on a file.
453  */
454
455 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
456                                          void *private_data)
457 {
458         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
459                 close(fsp->fh->fd);
460                 fsp->fh->fd = -1;
461         }
462         return NULL;
463 }
464
465 static int create_aio_child(struct smbd_server_connection *sconn,
466                             struct aio_child_list *children,
467                             size_t map_size,
468                             struct aio_child **presult)
469 {
470         struct aio_child *result;
471         int fdpair[2];
472         int ret;
473
474         fdpair[0] = fdpair[1] = -1;
475
476         result = talloc_zero(children, struct aio_child);
477         if (result == NULL) {
478                 return ENOMEM;
479         }
480
481         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
482                 ret = errno;
483                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
484                 goto fail;
485         }
486
487         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
488
489         result->map = mmap_area_init(result, map_size);
490         if (result->map == NULL) {
491                 ret = errno;
492                 DEBUG(0, ("Could not create mmap area\n"));
493                 goto fail;
494         }
495
496         result->pid = fork();
497         if (result->pid == -1) {
498                 ret = errno;
499                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
500                 goto fail;
501         }
502
503         if (result->pid == 0) {
504                 close(fdpair[0]);
505                 result->sockfd = fdpair[1];
506                 files_forall(sconn, close_fsp_fd, NULL);
507                 aio_child_loop(result->sockfd, result->map);
508         }
509
510         DEBUG(10, ("Child %d created with sockfd %d\n",
511                         result->pid, fdpair[0]));
512
513         result->sockfd = fdpair[0];
514         close(fdpair[1]);
515
516         result->list = children;
517         DLIST_ADD(children->children, result);
518
519         talloc_set_destructor(result, aio_child_destructor);
520
521         *presult = result;
522
523         return 0;
524
525  fail:
526         if (fdpair[0] != -1) close(fdpair[0]);
527         if (fdpair[1] != -1) close(fdpair[1]);
528         TALLOC_FREE(result);
529
530         return ret;
531 }
532
533 static int get_idle_child(struct vfs_handle_struct *handle,
534                           struct aio_child **pchild)
535 {
536         struct aio_child_list *children;
537         struct aio_child *child;
538
539         children = init_aio_children(handle);
540         if (children == NULL) {
541                 return ENOMEM;
542         }
543
544         for (child = children->children; child != NULL; child = child->next) {
545                 if (!child->busy) {
546                         break;
547                 }
548         }
549
550         if (child == NULL) {
551                 int ret;
552
553                 DEBUG(10, ("no idle child found, creating new one\n"));
554
555                 ret = create_aio_child(handle->conn->sconn, children,
556                                           128*1024, &child);
557                 if (ret != 0) {
558                         DEBUG(10, ("create_aio_child failed: %s\n",
559                                    strerror(errno)));
560                         return ret;
561                 }
562         }
563
564         child->dont_delete = true;
565         child->busy = true;
566
567         *pchild = child;
568         return 0;
569 }
570
571 struct aio_fork_pread_state {
572         struct aio_child *child;
573         ssize_t ret;
574         int err;
575 };
576
577 static void aio_fork_pread_done(struct tevent_req *subreq);
578
579 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
580                                               TALLOC_CTX *mem_ctx,
581                                               struct tevent_context *ev,
582                                               struct files_struct *fsp,
583                                               void *data,
584                                               size_t n, off_t offset)
585 {
586         struct tevent_req *req, *subreq;
587         struct aio_fork_pread_state *state;
588         struct rw_cmd cmd;
589         ssize_t written;
590         int err;
591         struct aio_fork_config *config;
592
593         SMB_VFS_HANDLE_GET_DATA(handle, config,
594                                 struct aio_fork_config,
595                                 return NULL);
596
597         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
598         if (req == NULL) {
599                 return NULL;
600         }
601
602         if (n > 128*1024) {
603                 /* TODO: support variable buffers */
604                 tevent_req_error(req, EINVAL);
605                 return tevent_req_post(req, ev);
606         }
607
608         err = get_idle_child(handle, &state->child);
609         if (err != 0) {
610                 tevent_req_error(req, err);
611                 return tevent_req_post(req, ev);
612         }
613
614         ZERO_STRUCT(cmd);
615         cmd.n = n;
616         cmd.offset = offset;
617         cmd.cmd = READ_CMD;
618         cmd.erratic_testing_mode = config->erratic_testing_mode;
619
620         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
621                    (int)state->child->pid));
622
623         /*
624          * Not making this async. We're writing into an empty unix
625          * domain socket. This should never block.
626          */
627         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
628                            fsp->fh->fd);
629         if (written == -1) {
630                 err = errno;
631
632                 TALLOC_FREE(state->child);
633
634                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
635                 tevent_req_error(req, err);
636                 return tevent_req_post(req, ev);
637         }
638
639         subreq = read_packet_send(state, ev, state->child->sockfd,
640                                   sizeof(struct rw_ret), NULL, NULL);
641         if (tevent_req_nomem(subreq, req)) {
642                 TALLOC_FREE(state->child); /* we sent sth down */
643                 return tevent_req_post(req, ev);
644         }
645         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
646         return req;
647 }
648
649 static void aio_fork_pread_done(struct tevent_req *subreq)
650 {
651         struct tevent_req *req = tevent_req_callback_data(
652                 subreq, struct tevent_req);
653         struct aio_fork_pread_state *state = tevent_req_data(
654                 req, struct aio_fork_pread_state);
655         ssize_t nread;
656         uint8_t *buf;
657         int err;
658         struct rw_ret *retbuf;
659
660         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
661         TALLOC_FREE(subreq);
662         if (nread == -1) {
663                 TALLOC_FREE(state->child);
664                 tevent_req_error(req, err);
665                 return;
666         }
667
668         state->child->busy = false;
669
670         retbuf = (struct rw_ret *)buf;
671         state->ret = retbuf->size;
672         state->err = retbuf->ret_errno;
673         tevent_req_done(req);
674 }
675
676 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
677 {
678         struct aio_fork_pread_state *state = tevent_req_data(
679                 req, struct aio_fork_pread_state);
680
681         if (tevent_req_is_unix_error(req, err)) {
682                 return -1;
683         }
684         if (state->ret == -1) {
685                 *err = state->err;
686         }
687         return state->ret;
688 }
689
690 struct aio_fork_pwrite_state {
691         struct aio_child *child;
692         ssize_t ret;
693         int err;
694 };
695
696 static void aio_fork_pwrite_done(struct tevent_req *subreq);
697
698 static struct tevent_req *aio_fork_pwrite_send(
699         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
700         struct tevent_context *ev, struct files_struct *fsp,
701         const void *data, size_t n, off_t offset)
702 {
703         struct tevent_req *req, *subreq;
704         struct aio_fork_pwrite_state *state;
705         struct rw_cmd cmd;
706         ssize_t written;
707         int err;
708         struct aio_fork_config *config;
709         SMB_VFS_HANDLE_GET_DATA(handle, config,
710                                 struct aio_fork_config,
711                                 return NULL);
712
713         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
714         if (req == NULL) {
715                 return NULL;
716         }
717
718         if (n > 128*1024) {
719                 /* TODO: support variable buffers */
720                 tevent_req_error(req, EINVAL);
721                 return tevent_req_post(req, ev);
722         }
723
724         err = get_idle_child(handle, &state->child);
725         if (err != 0) {
726                 tevent_req_error(req, err);
727                 return tevent_req_post(req, ev);
728         }
729
730         ZERO_STRUCT(cmd);
731         cmd.n = n;
732         cmd.offset = offset;
733         cmd.cmd = WRITE_CMD;
734         cmd.erratic_testing_mode = config->erratic_testing_mode;
735
736         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
737                    (int)state->child->pid));
738
739         /*
740          * Not making this async. We're writing into an empty unix
741          * domain socket. This should never block.
742          */
743         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
744                            fsp->fh->fd);
745         if (written == -1) {
746                 err = errno;
747
748                 TALLOC_FREE(state->child);
749
750                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
751                 tevent_req_error(req, err);
752                 return tevent_req_post(req, ev);
753         }
754
755         subreq = read_packet_send(state, ev, state->child->sockfd,
756                                   sizeof(struct rw_ret), NULL, NULL);
757         if (tevent_req_nomem(subreq, req)) {
758                 TALLOC_FREE(state->child); /* we sent sth down */
759                 return tevent_req_post(req, ev);
760         }
761         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
762         return req;
763 }
764
765 static void aio_fork_pwrite_done(struct tevent_req *subreq)
766 {
767         struct tevent_req *req = tevent_req_callback_data(
768                 subreq, struct tevent_req);
769         struct aio_fork_pwrite_state *state = tevent_req_data(
770                 req, struct aio_fork_pwrite_state);
771         ssize_t nread;
772         uint8_t *buf;
773         int err;
774         struct rw_ret *retbuf;
775
776         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
777         TALLOC_FREE(subreq);
778         if (nread == -1) {
779                 TALLOC_FREE(state->child);
780                 tevent_req_error(req, err);
781                 return;
782         }
783
784         state->child->busy = false;
785
786         retbuf = (struct rw_ret *)buf;
787         state->ret = retbuf->size;
788         state->err = retbuf->ret_errno;
789         tevent_req_done(req);
790 }
791
792 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
793 {
794         struct aio_fork_pwrite_state *state = tevent_req_data(
795                 req, struct aio_fork_pwrite_state);
796
797         if (tevent_req_is_unix_error(req, err)) {
798                 return -1;
799         }
800         if (state->ret == -1) {
801                 *err = state->err;
802         }
803         return state->ret;
804 }
805
806 struct aio_fork_fsync_state {
807         struct aio_child *child;
808         ssize_t ret;
809         int err;
810 };
811
812 static void aio_fork_fsync_done(struct tevent_req *subreq);
813
814 static struct tevent_req *aio_fork_fsync_send(
815         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
816         struct tevent_context *ev, struct files_struct *fsp)
817 {
818         struct tevent_req *req, *subreq;
819         struct aio_fork_fsync_state *state;
820         struct rw_cmd cmd;
821         ssize_t written;
822         int err;
823         struct aio_fork_config *config;
824
825         SMB_VFS_HANDLE_GET_DATA(handle, config,
826                                 struct aio_fork_config,
827                                 return NULL);
828
829         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
830         if (req == NULL) {
831                 return NULL;
832         }
833
834         err = get_idle_child(handle, &state->child);
835         if (err != 0) {
836                 tevent_req_error(req, err);
837                 return tevent_req_post(req, ev);
838         }
839
840         ZERO_STRUCT(cmd);
841         cmd.cmd = FSYNC_CMD;
842         cmd.erratic_testing_mode = config->erratic_testing_mode;
843
844         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
845                    (int)state->child->pid));
846
847         /*
848          * Not making this async. We're writing into an empty unix
849          * domain socket. This should never block.
850          */
851         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
852                            fsp->fh->fd);
853         if (written == -1) {
854                 err = errno;
855
856                 TALLOC_FREE(state->child);
857
858                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
859                 tevent_req_error(req, err);
860                 return tevent_req_post(req, ev);
861         }
862
863         subreq = read_packet_send(state, ev, state->child->sockfd,
864                                   sizeof(struct rw_ret), NULL, NULL);
865         if (tevent_req_nomem(subreq, req)) {
866                 TALLOC_FREE(state->child); /* we sent sth down */
867                 return tevent_req_post(req, ev);
868         }
869         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
870         return req;
871 }
872
873 static void aio_fork_fsync_done(struct tevent_req *subreq)
874 {
875         struct tevent_req *req = tevent_req_callback_data(
876                 subreq, struct tevent_req);
877         struct aio_fork_fsync_state *state = tevent_req_data(
878                 req, struct aio_fork_fsync_state);
879         ssize_t nread;
880         uint8_t *buf;
881         int err;
882         struct rw_ret *retbuf;
883
884         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
885         TALLOC_FREE(subreq);
886         if (nread == -1) {
887                 TALLOC_FREE(state->child);
888                 tevent_req_error(req, err);
889                 return;
890         }
891
892         state->child->busy = false;
893
894         retbuf = (struct rw_ret *)buf;
895         state->ret = retbuf->size;
896         state->err = retbuf->ret_errno;
897         tevent_req_done(req);
898 }
899
900 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
901 {
902         struct aio_fork_fsync_state *state = tevent_req_data(
903                 req, struct aio_fork_fsync_state);
904
905         if (tevent_req_is_unix_error(req, err)) {
906                 return -1;
907         }
908         if (state->ret == -1) {
909                 *err = state->err;
910         }
911         return state->ret;
912 }
913
914 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
915                             const char *user)
916 {
917         int ret;
918         struct aio_fork_config *config;
919         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
920
921         if (ret < 0) {
922                 return ret;
923         }
924
925         config = talloc_zero(handle->conn, struct aio_fork_config);
926         if (!config) {
927                 SMB_VFS_NEXT_DISCONNECT(handle);
928                 DEBUG(0, ("talloc_zero() failed\n"));
929                 return -1;
930         }
931
932         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
933                                                     "erratic_testing_mode", false);
934         
935         SMB_VFS_HANDLE_SET_DATA(handle, config,
936                                 NULL, struct aio_fork_config,
937                                 return -1);
938
939         /*********************************************************************
940          * How many threads to initialize ?
941          * 100 per process seems insane as a default until you realize that
942          * (a) Threads terminate after 1 second when idle.
943          * (b) Throttling is done in SMB2 via the crediting algorithm.
944          * (c) SMB1 clients are limited to max_mux (50) outstanding
945          *     requests and Windows clients don't use this anyway.
946          * Essentially we want this to be unlimited unless smb.conf
947          * says different.
948          *********************************************************************/
949         aio_pending_size = 100;
950         return 0;
951 }
952
953 static struct vfs_fn_pointers vfs_aio_fork_fns = {
954         .connect_fn = aio_fork_connect,
955         .pread_send_fn = aio_fork_pread_send,
956         .pread_recv_fn = aio_fork_pread_recv,
957         .pwrite_send_fn = aio_fork_pwrite_send,
958         .pwrite_recv_fn = aio_fork_pwrite_recv,
959         .fsync_send_fn = aio_fork_fsync_send,
960         .fsync_recv_fn = aio_fork_fsync_recv,
961 };
962
963 NTSTATUS vfs_aio_fork_init(void);
964 NTSTATUS vfs_aio_fork_init(void)
965 {
966         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
967                                 "aio_fork", &vfs_aio_fork_fns);
968 }