lib: Split out sys_[read|write] & friends
[samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include "lib/sys_rw.h"
30
31 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
32 # error Can not pass file descriptors
33 #endif
34
35 #undef recvmsg
36
37 #ifndef MAP_FILE
38 #define MAP_FILE 0
39 #endif
40
41 struct aio_fork_config {
42         bool erratic_testing_mode;
43 };
44
45 struct mmap_area {
46         size_t size;
47         volatile void *ptr;
48 };
49
50 static int mmap_area_destructor(struct mmap_area *area)
51 {
52         munmap((void *)area->ptr, area->size);
53         return 0;
54 }
55
56 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
57 {
58         struct mmap_area *result;
59         int fd;
60
61         result = talloc(mem_ctx, struct mmap_area);
62         if (result == NULL) {
63                 DEBUG(0, ("talloc failed\n"));
64                 goto fail;
65         }
66
67         fd = open("/dev/zero", O_RDWR);
68         if (fd == -1) {
69                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
70                           strerror(errno)));
71                 goto fail;
72         }
73
74         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
75                            MAP_SHARED|MAP_FILE, fd, 0);
76         close(fd);
77         if (result->ptr == MAP_FAILED) {
78                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
79                 goto fail;
80         }
81
82         result->size = size;
83         talloc_set_destructor(result, mmap_area_destructor);
84
85         return result;
86
87 fail:
88         TALLOC_FREE(result);
89         return NULL;
90 }
91
92 enum cmd_type {
93         READ_CMD,
94         WRITE_CMD,
95         FSYNC_CMD
96 };
97
98 static const char *cmd_type_str(enum cmd_type cmd)
99 {
100         const char *result;
101
102         switch (cmd) {
103         case READ_CMD:
104                 result = "READ";
105                 break;
106         case WRITE_CMD:
107                 result = "WRITE";
108                 break;
109         case FSYNC_CMD:
110                 result = "FSYNC";
111                 break;
112         default:
113                 result = "<UNKNOWN>";
114                 break;
115         }
116         return result;
117 }
118
119 struct rw_cmd {
120         size_t n;
121         off_t offset;
122         enum cmd_type cmd;
123         bool erratic_testing_mode;
124 };
125
126 struct rw_ret {
127         ssize_t size;
128         int ret_errno;
129 };
130
131 struct aio_child_list;
132
133 struct aio_child {
134         struct aio_child *prev, *next;
135         struct aio_child_list *list;
136         pid_t pid;
137         int sockfd;
138         struct mmap_area *map;
139         bool dont_delete;       /* Marked as in use since last cleanup */
140         bool busy;
141 };
142
143 struct aio_child_list {
144         struct aio_child *children;
145         struct tevent_timer *cleanup_event;
146 };
147
148 static void free_aio_children(void **p)
149 {
150         TALLOC_FREE(*p);
151 }
152
153 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
154 {
155         struct msghdr msg;
156         struct iovec iov[1];
157         ssize_t n;
158 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
159         int newfd;
160
161         ZERO_STRUCT(msg);
162         msg.msg_accrights = (caddr_t) &newfd;
163         msg.msg_accrightslen = sizeof(int);
164 #else
165
166         union {
167           struct cmsghdr        cm;
168           char                          control[CMSG_SPACE(sizeof(int))];
169         } control_un;
170         struct cmsghdr  *cmptr;
171
172         ZERO_STRUCT(msg);
173         ZERO_STRUCT(control_un);
174
175         msg.msg_control = control_un.control;
176         msg.msg_controllen = sizeof(control_un.control);
177 #endif
178
179         msg.msg_name = NULL;
180         msg.msg_namelen = 0;
181
182         iov[0].iov_base = (void *)ptr;
183         iov[0].iov_len = nbytes;
184         msg.msg_iov = iov;
185         msg.msg_iovlen = 1;
186
187         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
188                 return(n);
189         }
190
191 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
192         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
193             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
194                 if (cmptr->cmsg_level != SOL_SOCKET) {
195                         DEBUG(10, ("control level != SOL_SOCKET"));
196                         errno = EINVAL;
197                         return -1;
198                 }
199                 if (cmptr->cmsg_type != SCM_RIGHTS) {
200                         DEBUG(10, ("control type != SCM_RIGHTS"));
201                         errno = EINVAL;
202                         return -1;
203                 }
204                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
205         } else {
206                 *recvfd = -1;           /* descriptor was not passed */
207         }
208 #else
209         if (msg.msg_accrightslen == sizeof(int)) {
210                 *recvfd = newfd;
211         }
212         else {
213                 *recvfd = -1;           /* descriptor was not passed */
214         }
215 #endif
216
217         return(n);
218 }
219
220 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
221 {
222         struct msghdr   msg;
223         struct iovec    iov[1];
224
225 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
226         union {
227                 struct cmsghdr  cm;
228                 char control[CMSG_SPACE(sizeof(int))];
229         } control_un;
230         struct cmsghdr  *cmptr;
231
232         ZERO_STRUCT(msg);
233         ZERO_STRUCT(control_un);
234
235         msg.msg_control = control_un.control;
236         msg.msg_controllen = sizeof(control_un.control);
237
238         cmptr = CMSG_FIRSTHDR(&msg);
239         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
240         cmptr->cmsg_level = SOL_SOCKET;
241         cmptr->cmsg_type = SCM_RIGHTS;
242         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
243 #else
244         ZERO_STRUCT(msg);
245         msg.msg_accrights = (caddr_t) &sendfd;
246         msg.msg_accrightslen = sizeof(int);
247 #endif
248
249         msg.msg_name = NULL;
250         msg.msg_namelen = 0;
251
252         ZERO_STRUCT(iov);
253         iov[0].iov_base = (void *)ptr;
254         iov[0].iov_len = nbytes;
255         msg.msg_iov = iov;
256         msg.msg_iovlen = 1;
257
258         return (sendmsg(fd, &msg, 0));
259 }
260
261 static void aio_child_cleanup(struct tevent_context *event_ctx,
262                               struct tevent_timer *te,
263                               struct timeval now,
264                               void *private_data)
265 {
266         struct aio_child_list *list = talloc_get_type_abort(
267                 private_data, struct aio_child_list);
268         struct aio_child *child, *next;
269
270         TALLOC_FREE(list->cleanup_event);
271
272         for (child = list->children; child != NULL; child = next) {
273                 next = child->next;
274
275                 if (child->busy) {
276                         DEBUG(10, ("child %d currently active\n",
277                                    (int)child->pid));
278                         continue;
279                 }
280
281                 if (child->dont_delete) {
282                         DEBUG(10, ("Child %d was active since last cleanup\n",
283                                    (int)child->pid));
284                         child->dont_delete = false;
285                         continue;
286                 }
287
288                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
289                            "deleting\n", (int)child->pid));
290
291                 TALLOC_FREE(child);
292                 child = next;
293         }
294
295         if (list->children != NULL) {
296                 /*
297                  * Re-schedule the next cleanup round
298                  */
299                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
300                                                       timeval_add(&now, 30, 0),
301                                                       aio_child_cleanup, list);
302
303         }
304 }
305
306 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
307 {
308         struct aio_child_list *data = NULL;
309
310         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
311                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
312                                         return NULL);
313         }
314
315         if (data == NULL) {
316                 data = talloc_zero(NULL, struct aio_child_list);
317                 if (data == NULL) {
318                         return NULL;
319                 }
320         }
321
322         /*
323          * Regardless of whether the child_list had been around or not, make
324          * sure that we have a cleanup timed event. This timed event will
325          * delete itself when it finds that no children are around anymore.
326          */
327
328         if (data->cleanup_event == NULL) {
329                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
330                                                       timeval_current_ofs(30, 0),
331                                                       aio_child_cleanup, data);
332                 if (data->cleanup_event == NULL) {
333                         TALLOC_FREE(data);
334                         return NULL;
335                 }
336         }
337
338         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
339                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
340                                         struct aio_child_list, return False);
341         }
342
343         return data;
344 }
345
346 static void aio_child_loop(int sockfd, struct mmap_area *map)
347 {
348         while (true) {
349                 int fd = -1;
350                 ssize_t ret;
351                 struct rw_cmd cmd_struct;
352                 struct rw_ret ret_struct;
353
354                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
355                 if (ret != sizeof(cmd_struct)) {
356                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
357                                    strerror(errno)));
358                         exit(1);
359                 }
360
361                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
362                            cmd_type_str(cmd_struct.cmd),
363                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
364
365                 if (cmd_struct.erratic_testing_mode) {
366                         /*
367                          * For developer testing, we want erratic behaviour for
368                          * async I/O times
369                          */
370                         uint8_t randval;
371                         unsigned msecs;
372                         /*
373                          * use generate_random_buffer, we just forked from a
374                          * common parent state
375                          */
376                         generate_random_buffer(&randval, sizeof(randval));
377                         msecs = randval + 20;
378                         DEBUG(10, ("delaying for %u msecs\n", msecs));
379                         smb_msleep(msecs);
380                 }
381
382                 ZERO_STRUCT(ret_struct);
383
384                 switch (cmd_struct.cmd) {
385                 case READ_CMD:
386                         ret_struct.size = sys_pread(
387                                 fd, (void *)map->ptr, cmd_struct.n,
388                                 cmd_struct.offset);
389 #if 0
390 /* This breaks "make test" when run with aio_fork module. */
391 #ifdef DEVELOPER
392                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
393 #endif
394 #endif
395                         break;
396                 case WRITE_CMD:
397                         ret_struct.size = sys_pwrite(
398                                 fd, (void *)map->ptr, cmd_struct.n,
399                                 cmd_struct.offset);
400                         break;
401                 case FSYNC_CMD:
402                         ret_struct.size = fsync(fd);
403                         break;
404                 default:
405                         ret_struct.size = -1;
406                         errno = EINVAL;
407                 }
408
409                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
410                            (int)ret_struct.size));
411
412                 if (ret_struct.size == -1) {
413                         ret_struct.ret_errno = errno;
414                 }
415
416                 /*
417                  * Close the fd before telling our parent we're done. The
418                  * parent might close and re-open the file very quickly, and
419                  * with system-level share modes (GPFS) we would get an
420                  * unjustified SHARING_VIOLATION.
421                  */
422                 close(fd);
423
424                 ret = write_data(sockfd, (char *)&ret_struct,
425                                  sizeof(ret_struct));
426                 if (ret != sizeof(ret_struct)) {
427                         DEBUG(10, ("could not write ret_struct: %s\n",
428                                    strerror(errno)));
429                         exit(2);
430                 }
431         }
432 }
433
434 static int aio_child_destructor(struct aio_child *child)
435 {
436         char c=0;
437
438         SMB_ASSERT(!child->busy);
439
440         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
441                         child->pid, child->sockfd));
442
443         /*
444          * closing the sockfd makes the child not return from recvmsg() on RHEL
445          * 5.5 so instead force the child to exit by writing bad data to it
446          */
447         write(child->sockfd, &c, sizeof(c));
448         close(child->sockfd);
449         DLIST_REMOVE(child->list->children, child);
450         return 0;
451 }
452
453 /*
454  * We have to close all fd's in open files, we might incorrectly hold a system
455  * level share mode on a file.
456  */
457
458 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
459                                          void *private_data)
460 {
461         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
462                 close(fsp->fh->fd);
463                 fsp->fh->fd = -1;
464         }
465         return NULL;
466 }
467
468 static int create_aio_child(struct smbd_server_connection *sconn,
469                             struct aio_child_list *children,
470                             size_t map_size,
471                             struct aio_child **presult)
472 {
473         struct aio_child *result;
474         int fdpair[2];
475         int ret;
476
477         fdpair[0] = fdpair[1] = -1;
478
479         result = talloc_zero(children, struct aio_child);
480         if (result == NULL) {
481                 return ENOMEM;
482         }
483
484         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
485                 ret = errno;
486                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
487                 goto fail;
488         }
489
490         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
491
492         result->map = mmap_area_init(result, map_size);
493         if (result->map == NULL) {
494                 ret = errno;
495                 DEBUG(0, ("Could not create mmap area\n"));
496                 goto fail;
497         }
498
499         result->pid = fork();
500         if (result->pid == -1) {
501                 ret = errno;
502                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
503                 goto fail;
504         }
505
506         if (result->pid == 0) {
507                 close(fdpair[0]);
508                 result->sockfd = fdpair[1];
509                 files_forall(sconn, close_fsp_fd, NULL);
510                 aio_child_loop(result->sockfd, result->map);
511         }
512
513         DEBUG(10, ("Child %d created with sockfd %d\n",
514                         result->pid, fdpair[0]));
515
516         result->sockfd = fdpair[0];
517         close(fdpair[1]);
518
519         result->list = children;
520         DLIST_ADD(children->children, result);
521
522         talloc_set_destructor(result, aio_child_destructor);
523
524         *presult = result;
525
526         return 0;
527
528  fail:
529         if (fdpair[0] != -1) close(fdpair[0]);
530         if (fdpair[1] != -1) close(fdpair[1]);
531         TALLOC_FREE(result);
532
533         return ret;
534 }
535
536 static int get_idle_child(struct vfs_handle_struct *handle,
537                           struct aio_child **pchild)
538 {
539         struct aio_child_list *children;
540         struct aio_child *child;
541
542         children = init_aio_children(handle);
543         if (children == NULL) {
544                 return ENOMEM;
545         }
546
547         for (child = children->children; child != NULL; child = child->next) {
548                 if (!child->busy) {
549                         break;
550                 }
551         }
552
553         if (child == NULL) {
554                 int ret;
555
556                 DEBUG(10, ("no idle child found, creating new one\n"));
557
558                 ret = create_aio_child(handle->conn->sconn, children,
559                                           128*1024, &child);
560                 if (ret != 0) {
561                         DEBUG(10, ("create_aio_child failed: %s\n",
562                                    strerror(errno)));
563                         return ret;
564                 }
565         }
566
567         child->dont_delete = true;
568         child->busy = true;
569
570         *pchild = child;
571         return 0;
572 }
573
574 struct aio_fork_pread_state {
575         struct aio_child *child;
576         ssize_t ret;
577         int err;
578 };
579
580 static void aio_fork_pread_done(struct tevent_req *subreq);
581
582 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
583                                               TALLOC_CTX *mem_ctx,
584                                               struct tevent_context *ev,
585                                               struct files_struct *fsp,
586                                               void *data,
587                                               size_t n, off_t offset)
588 {
589         struct tevent_req *req, *subreq;
590         struct aio_fork_pread_state *state;
591         struct rw_cmd cmd;
592         ssize_t written;
593         int err;
594         struct aio_fork_config *config;
595
596         SMB_VFS_HANDLE_GET_DATA(handle, config,
597                                 struct aio_fork_config,
598                                 return NULL);
599
600         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
601         if (req == NULL) {
602                 return NULL;
603         }
604
605         if (n > 128*1024) {
606                 /* TODO: support variable buffers */
607                 tevent_req_error(req, EINVAL);
608                 return tevent_req_post(req, ev);
609         }
610
611         err = get_idle_child(handle, &state->child);
612         if (err != 0) {
613                 tevent_req_error(req, err);
614                 return tevent_req_post(req, ev);
615         }
616
617         ZERO_STRUCT(cmd);
618         cmd.n = n;
619         cmd.offset = offset;
620         cmd.cmd = READ_CMD;
621         cmd.erratic_testing_mode = config->erratic_testing_mode;
622
623         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
624                    (int)state->child->pid));
625
626         /*
627          * Not making this async. We're writing into an empty unix
628          * domain socket. This should never block.
629          */
630         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
631                            fsp->fh->fd);
632         if (written == -1) {
633                 err = errno;
634
635                 TALLOC_FREE(state->child);
636
637                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
638                 tevent_req_error(req, err);
639                 return tevent_req_post(req, ev);
640         }
641
642         subreq = read_packet_send(state, ev, state->child->sockfd,
643                                   sizeof(struct rw_ret), NULL, NULL);
644         if (tevent_req_nomem(subreq, req)) {
645                 TALLOC_FREE(state->child); /* we sent sth down */
646                 return tevent_req_post(req, ev);
647         }
648         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
649         return req;
650 }
651
652 static void aio_fork_pread_done(struct tevent_req *subreq)
653 {
654         struct tevent_req *req = tevent_req_callback_data(
655                 subreq, struct tevent_req);
656         struct aio_fork_pread_state *state = tevent_req_data(
657                 req, struct aio_fork_pread_state);
658         ssize_t nread;
659         uint8_t *buf;
660         int err;
661         struct rw_ret *retbuf;
662
663         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
664         TALLOC_FREE(subreq);
665         if (nread == -1) {
666                 TALLOC_FREE(state->child);
667                 tevent_req_error(req, err);
668                 return;
669         }
670
671         state->child->busy = false;
672
673         retbuf = (struct rw_ret *)buf;
674         state->ret = retbuf->size;
675         state->err = retbuf->ret_errno;
676         tevent_req_done(req);
677 }
678
679 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
680 {
681         struct aio_fork_pread_state *state = tevent_req_data(
682                 req, struct aio_fork_pread_state);
683
684         if (tevent_req_is_unix_error(req, err)) {
685                 return -1;
686         }
687         if (state->ret == -1) {
688                 *err = state->err;
689         }
690         return state->ret;
691 }
692
693 struct aio_fork_pwrite_state {
694         struct aio_child *child;
695         ssize_t ret;
696         int err;
697 };
698
699 static void aio_fork_pwrite_done(struct tevent_req *subreq);
700
701 static struct tevent_req *aio_fork_pwrite_send(
702         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
703         struct tevent_context *ev, struct files_struct *fsp,
704         const void *data, size_t n, off_t offset)
705 {
706         struct tevent_req *req, *subreq;
707         struct aio_fork_pwrite_state *state;
708         struct rw_cmd cmd;
709         ssize_t written;
710         int err;
711         struct aio_fork_config *config;
712         SMB_VFS_HANDLE_GET_DATA(handle, config,
713                                 struct aio_fork_config,
714                                 return NULL);
715
716         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
717         if (req == NULL) {
718                 return NULL;
719         }
720
721         if (n > 128*1024) {
722                 /* TODO: support variable buffers */
723                 tevent_req_error(req, EINVAL);
724                 return tevent_req_post(req, ev);
725         }
726
727         err = get_idle_child(handle, &state->child);
728         if (err != 0) {
729                 tevent_req_error(req, err);
730                 return tevent_req_post(req, ev);
731         }
732
733         ZERO_STRUCT(cmd);
734         cmd.n = n;
735         cmd.offset = offset;
736         cmd.cmd = WRITE_CMD;
737         cmd.erratic_testing_mode = config->erratic_testing_mode;
738
739         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
740                    (int)state->child->pid));
741
742         /*
743          * Not making this async. We're writing into an empty unix
744          * domain socket. This should never block.
745          */
746         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
747                            fsp->fh->fd);
748         if (written == -1) {
749                 err = errno;
750
751                 TALLOC_FREE(state->child);
752
753                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
754                 tevent_req_error(req, err);
755                 return tevent_req_post(req, ev);
756         }
757
758         subreq = read_packet_send(state, ev, state->child->sockfd,
759                                   sizeof(struct rw_ret), NULL, NULL);
760         if (tevent_req_nomem(subreq, req)) {
761                 TALLOC_FREE(state->child); /* we sent sth down */
762                 return tevent_req_post(req, ev);
763         }
764         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
765         return req;
766 }
767
768 static void aio_fork_pwrite_done(struct tevent_req *subreq)
769 {
770         struct tevent_req *req = tevent_req_callback_data(
771                 subreq, struct tevent_req);
772         struct aio_fork_pwrite_state *state = tevent_req_data(
773                 req, struct aio_fork_pwrite_state);
774         ssize_t nread;
775         uint8_t *buf;
776         int err;
777         struct rw_ret *retbuf;
778
779         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
780         TALLOC_FREE(subreq);
781         if (nread == -1) {
782                 TALLOC_FREE(state->child);
783                 tevent_req_error(req, err);
784                 return;
785         }
786
787         state->child->busy = false;
788
789         retbuf = (struct rw_ret *)buf;
790         state->ret = retbuf->size;
791         state->err = retbuf->ret_errno;
792         tevent_req_done(req);
793 }
794
795 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
796 {
797         struct aio_fork_pwrite_state *state = tevent_req_data(
798                 req, struct aio_fork_pwrite_state);
799
800         if (tevent_req_is_unix_error(req, err)) {
801                 return -1;
802         }
803         if (state->ret == -1) {
804                 *err = state->err;
805         }
806         return state->ret;
807 }
808
809 struct aio_fork_fsync_state {
810         struct aio_child *child;
811         ssize_t ret;
812         int err;
813 };
814
815 static void aio_fork_fsync_done(struct tevent_req *subreq);
816
817 static struct tevent_req *aio_fork_fsync_send(
818         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
819         struct tevent_context *ev, struct files_struct *fsp)
820 {
821         struct tevent_req *req, *subreq;
822         struct aio_fork_fsync_state *state;
823         struct rw_cmd cmd;
824         ssize_t written;
825         int err;
826         struct aio_fork_config *config;
827
828         SMB_VFS_HANDLE_GET_DATA(handle, config,
829                                 struct aio_fork_config,
830                                 return NULL);
831
832         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
833         if (req == NULL) {
834                 return NULL;
835         }
836
837         err = get_idle_child(handle, &state->child);
838         if (err != 0) {
839                 tevent_req_error(req, err);
840                 return tevent_req_post(req, ev);
841         }
842
843         ZERO_STRUCT(cmd);
844         cmd.cmd = FSYNC_CMD;
845         cmd.erratic_testing_mode = config->erratic_testing_mode;
846
847         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
848                    (int)state->child->pid));
849
850         /*
851          * Not making this async. We're writing into an empty unix
852          * domain socket. This should never block.
853          */
854         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
855                            fsp->fh->fd);
856         if (written == -1) {
857                 err = errno;
858
859                 TALLOC_FREE(state->child);
860
861                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
862                 tevent_req_error(req, err);
863                 return tevent_req_post(req, ev);
864         }
865
866         subreq = read_packet_send(state, ev, state->child->sockfd,
867                                   sizeof(struct rw_ret), NULL, NULL);
868         if (tevent_req_nomem(subreq, req)) {
869                 TALLOC_FREE(state->child); /* we sent sth down */
870                 return tevent_req_post(req, ev);
871         }
872         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
873         return req;
874 }
875
876 static void aio_fork_fsync_done(struct tevent_req *subreq)
877 {
878         struct tevent_req *req = tevent_req_callback_data(
879                 subreq, struct tevent_req);
880         struct aio_fork_fsync_state *state = tevent_req_data(
881                 req, struct aio_fork_fsync_state);
882         ssize_t nread;
883         uint8_t *buf;
884         int err;
885         struct rw_ret *retbuf;
886
887         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
888         TALLOC_FREE(subreq);
889         if (nread == -1) {
890                 TALLOC_FREE(state->child);
891                 tevent_req_error(req, err);
892                 return;
893         }
894
895         state->child->busy = false;
896
897         retbuf = (struct rw_ret *)buf;
898         state->ret = retbuf->size;
899         state->err = retbuf->ret_errno;
900         tevent_req_done(req);
901 }
902
903 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
904 {
905         struct aio_fork_fsync_state *state = tevent_req_data(
906                 req, struct aio_fork_fsync_state);
907
908         if (tevent_req_is_unix_error(req, err)) {
909                 return -1;
910         }
911         if (state->ret == -1) {
912                 *err = state->err;
913         }
914         return state->ret;
915 }
916
917 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
918                             const char *user)
919 {
920         int ret;
921         struct aio_fork_config *config;
922         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
923
924         if (ret < 0) {
925                 return ret;
926         }
927
928         config = talloc_zero(handle->conn, struct aio_fork_config);
929         if (!config) {
930                 SMB_VFS_NEXT_DISCONNECT(handle);
931                 DEBUG(0, ("talloc_zero() failed\n"));
932                 return -1;
933         }
934
935         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
936                                                     "erratic_testing_mode", false);
937         
938         SMB_VFS_HANDLE_SET_DATA(handle, config,
939                                 NULL, struct aio_fork_config,
940                                 return -1);
941
942         /*********************************************************************
943          * How many threads to initialize ?
944          * 100 per process seems insane as a default until you realize that
945          * (a) Threads terminate after 1 second when idle.
946          * (b) Throttling is done in SMB2 via the crediting algorithm.
947          * (c) SMB1 clients are limited to max_mux (50) outstanding
948          *     requests and Windows clients don't use this anyway.
949          * Essentially we want this to be unlimited unless smb.conf
950          * says different.
951          *********************************************************************/
952         aio_pending_size = 100;
953         return 0;
954 }
955
956 static struct vfs_fn_pointers vfs_aio_fork_fns = {
957         .connect_fn = aio_fork_connect,
958         .pread_send_fn = aio_fork_pread_send,
959         .pread_recv_fn = aio_fork_pread_recv,
960         .pwrite_send_fn = aio_fork_pwrite_send,
961         .pwrite_recv_fn = aio_fork_pwrite_recv,
962         .fsync_send_fn = aio_fork_fsync_send,
963         .fsync_recv_fn = aio_fork_fsync_recv,
964 };
965
966 NTSTATUS vfs_aio_fork_init(void);
967 NTSTATUS vfs_aio_fork_init(void)
968 {
969         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
970                                 "aio_fork", &vfs_aio_fork_fns);
971 }