lib: Split out write_data[_iov]
[samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include "lib/sys_rw.h"
30 #include "lib/sys_rw_data.h"
31
32 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
33 # error Can not pass file descriptors
34 #endif
35
36 #undef recvmsg
37
38 #ifndef MAP_FILE
39 #define MAP_FILE 0
40 #endif
41
42 struct aio_fork_config {
43         bool erratic_testing_mode;
44 };
45
46 struct mmap_area {
47         size_t size;
48         volatile void *ptr;
49 };
50
51 static int mmap_area_destructor(struct mmap_area *area)
52 {
53         munmap((void *)area->ptr, area->size);
54         return 0;
55 }
56
57 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
58 {
59         struct mmap_area *result;
60         int fd;
61
62         result = talloc(mem_ctx, struct mmap_area);
63         if (result == NULL) {
64                 DEBUG(0, ("talloc failed\n"));
65                 goto fail;
66         }
67
68         fd = open("/dev/zero", O_RDWR);
69         if (fd == -1) {
70                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
71                           strerror(errno)));
72                 goto fail;
73         }
74
75         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
76                            MAP_SHARED|MAP_FILE, fd, 0);
77         close(fd);
78         if (result->ptr == MAP_FAILED) {
79                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
80                 goto fail;
81         }
82
83         result->size = size;
84         talloc_set_destructor(result, mmap_area_destructor);
85
86         return result;
87
88 fail:
89         TALLOC_FREE(result);
90         return NULL;
91 }
92
93 enum cmd_type {
94         READ_CMD,
95         WRITE_CMD,
96         FSYNC_CMD
97 };
98
99 static const char *cmd_type_str(enum cmd_type cmd)
100 {
101         const char *result;
102
103         switch (cmd) {
104         case READ_CMD:
105                 result = "READ";
106                 break;
107         case WRITE_CMD:
108                 result = "WRITE";
109                 break;
110         case FSYNC_CMD:
111                 result = "FSYNC";
112                 break;
113         default:
114                 result = "<UNKNOWN>";
115                 break;
116         }
117         return result;
118 }
119
120 struct rw_cmd {
121         size_t n;
122         off_t offset;
123         enum cmd_type cmd;
124         bool erratic_testing_mode;
125 };
126
127 struct rw_ret {
128         ssize_t size;
129         int ret_errno;
130 };
131
132 struct aio_child_list;
133
134 struct aio_child {
135         struct aio_child *prev, *next;
136         struct aio_child_list *list;
137         pid_t pid;
138         int sockfd;
139         struct mmap_area *map;
140         bool dont_delete;       /* Marked as in use since last cleanup */
141         bool busy;
142 };
143
144 struct aio_child_list {
145         struct aio_child *children;
146         struct tevent_timer *cleanup_event;
147 };
148
149 static void free_aio_children(void **p)
150 {
151         TALLOC_FREE(*p);
152 }
153
154 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
155 {
156         struct msghdr msg;
157         struct iovec iov[1];
158         ssize_t n;
159 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
160         int newfd;
161
162         ZERO_STRUCT(msg);
163         msg.msg_accrights = (caddr_t) &newfd;
164         msg.msg_accrightslen = sizeof(int);
165 #else
166
167         union {
168           struct cmsghdr        cm;
169           char                          control[CMSG_SPACE(sizeof(int))];
170         } control_un;
171         struct cmsghdr  *cmptr;
172
173         ZERO_STRUCT(msg);
174         ZERO_STRUCT(control_un);
175
176         msg.msg_control = control_un.control;
177         msg.msg_controllen = sizeof(control_un.control);
178 #endif
179
180         msg.msg_name = NULL;
181         msg.msg_namelen = 0;
182
183         iov[0].iov_base = (void *)ptr;
184         iov[0].iov_len = nbytes;
185         msg.msg_iov = iov;
186         msg.msg_iovlen = 1;
187
188         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
189                 return(n);
190         }
191
192 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
193         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
194             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
195                 if (cmptr->cmsg_level != SOL_SOCKET) {
196                         DEBUG(10, ("control level != SOL_SOCKET"));
197                         errno = EINVAL;
198                         return -1;
199                 }
200                 if (cmptr->cmsg_type != SCM_RIGHTS) {
201                         DEBUG(10, ("control type != SCM_RIGHTS"));
202                         errno = EINVAL;
203                         return -1;
204                 }
205                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
206         } else {
207                 *recvfd = -1;           /* descriptor was not passed */
208         }
209 #else
210         if (msg.msg_accrightslen == sizeof(int)) {
211                 *recvfd = newfd;
212         }
213         else {
214                 *recvfd = -1;           /* descriptor was not passed */
215         }
216 #endif
217
218         return(n);
219 }
220
221 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
222 {
223         struct msghdr   msg;
224         struct iovec    iov[1];
225
226 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
227         union {
228                 struct cmsghdr  cm;
229                 char control[CMSG_SPACE(sizeof(int))];
230         } control_un;
231         struct cmsghdr  *cmptr;
232
233         ZERO_STRUCT(msg);
234         ZERO_STRUCT(control_un);
235
236         msg.msg_control = control_un.control;
237         msg.msg_controllen = sizeof(control_un.control);
238
239         cmptr = CMSG_FIRSTHDR(&msg);
240         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
241         cmptr->cmsg_level = SOL_SOCKET;
242         cmptr->cmsg_type = SCM_RIGHTS;
243         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
244 #else
245         ZERO_STRUCT(msg);
246         msg.msg_accrights = (caddr_t) &sendfd;
247         msg.msg_accrightslen = sizeof(int);
248 #endif
249
250         msg.msg_name = NULL;
251         msg.msg_namelen = 0;
252
253         ZERO_STRUCT(iov);
254         iov[0].iov_base = (void *)ptr;
255         iov[0].iov_len = nbytes;
256         msg.msg_iov = iov;
257         msg.msg_iovlen = 1;
258
259         return (sendmsg(fd, &msg, 0));
260 }
261
262 static void aio_child_cleanup(struct tevent_context *event_ctx,
263                               struct tevent_timer *te,
264                               struct timeval now,
265                               void *private_data)
266 {
267         struct aio_child_list *list = talloc_get_type_abort(
268                 private_data, struct aio_child_list);
269         struct aio_child *child, *next;
270
271         TALLOC_FREE(list->cleanup_event);
272
273         for (child = list->children; child != NULL; child = next) {
274                 next = child->next;
275
276                 if (child->busy) {
277                         DEBUG(10, ("child %d currently active\n",
278                                    (int)child->pid));
279                         continue;
280                 }
281
282                 if (child->dont_delete) {
283                         DEBUG(10, ("Child %d was active since last cleanup\n",
284                                    (int)child->pid));
285                         child->dont_delete = false;
286                         continue;
287                 }
288
289                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
290                            "deleting\n", (int)child->pid));
291
292                 TALLOC_FREE(child);
293                 child = next;
294         }
295
296         if (list->children != NULL) {
297                 /*
298                  * Re-schedule the next cleanup round
299                  */
300                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
301                                                       timeval_add(&now, 30, 0),
302                                                       aio_child_cleanup, list);
303
304         }
305 }
306
307 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
308 {
309         struct aio_child_list *data = NULL;
310
311         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
312                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
313                                         return NULL);
314         }
315
316         if (data == NULL) {
317                 data = talloc_zero(NULL, struct aio_child_list);
318                 if (data == NULL) {
319                         return NULL;
320                 }
321         }
322
323         /*
324          * Regardless of whether the child_list had been around or not, make
325          * sure that we have a cleanup timed event. This timed event will
326          * delete itself when it finds that no children are around anymore.
327          */
328
329         if (data->cleanup_event == NULL) {
330                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
331                                                       timeval_current_ofs(30, 0),
332                                                       aio_child_cleanup, data);
333                 if (data->cleanup_event == NULL) {
334                         TALLOC_FREE(data);
335                         return NULL;
336                 }
337         }
338
339         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
340                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
341                                         struct aio_child_list, return False);
342         }
343
344         return data;
345 }
346
347 static void aio_child_loop(int sockfd, struct mmap_area *map)
348 {
349         while (true) {
350                 int fd = -1;
351                 ssize_t ret;
352                 struct rw_cmd cmd_struct;
353                 struct rw_ret ret_struct;
354
355                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
356                 if (ret != sizeof(cmd_struct)) {
357                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
358                                    strerror(errno)));
359                         exit(1);
360                 }
361
362                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
363                            cmd_type_str(cmd_struct.cmd),
364                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
365
366                 if (cmd_struct.erratic_testing_mode) {
367                         /*
368                          * For developer testing, we want erratic behaviour for
369                          * async I/O times
370                          */
371                         uint8_t randval;
372                         unsigned msecs;
373                         /*
374                          * use generate_random_buffer, we just forked from a
375                          * common parent state
376                          */
377                         generate_random_buffer(&randval, sizeof(randval));
378                         msecs = randval + 20;
379                         DEBUG(10, ("delaying for %u msecs\n", msecs));
380                         smb_msleep(msecs);
381                 }
382
383                 ZERO_STRUCT(ret_struct);
384
385                 switch (cmd_struct.cmd) {
386                 case READ_CMD:
387                         ret_struct.size = sys_pread(
388                                 fd, (void *)map->ptr, cmd_struct.n,
389                                 cmd_struct.offset);
390 #if 0
391 /* This breaks "make test" when run with aio_fork module. */
392 #ifdef DEVELOPER
393                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
394 #endif
395 #endif
396                         break;
397                 case WRITE_CMD:
398                         ret_struct.size = sys_pwrite(
399                                 fd, (void *)map->ptr, cmd_struct.n,
400                                 cmd_struct.offset);
401                         break;
402                 case FSYNC_CMD:
403                         ret_struct.size = fsync(fd);
404                         break;
405                 default:
406                         ret_struct.size = -1;
407                         errno = EINVAL;
408                 }
409
410                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
411                            (int)ret_struct.size));
412
413                 if (ret_struct.size == -1) {
414                         ret_struct.ret_errno = errno;
415                 }
416
417                 /*
418                  * Close the fd before telling our parent we're done. The
419                  * parent might close and re-open the file very quickly, and
420                  * with system-level share modes (GPFS) we would get an
421                  * unjustified SHARING_VIOLATION.
422                  */
423                 close(fd);
424
425                 ret = write_data(sockfd, (char *)&ret_struct,
426                                  sizeof(ret_struct));
427                 if (ret != sizeof(ret_struct)) {
428                         DEBUG(10, ("could not write ret_struct: %s\n",
429                                    strerror(errno)));
430                         exit(2);
431                 }
432         }
433 }
434
435 static int aio_child_destructor(struct aio_child *child)
436 {
437         char c=0;
438
439         SMB_ASSERT(!child->busy);
440
441         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
442                         child->pid, child->sockfd));
443
444         /*
445          * closing the sockfd makes the child not return from recvmsg() on RHEL
446          * 5.5 so instead force the child to exit by writing bad data to it
447          */
448         write(child->sockfd, &c, sizeof(c));
449         close(child->sockfd);
450         DLIST_REMOVE(child->list->children, child);
451         return 0;
452 }
453
454 /*
455  * We have to close all fd's in open files, we might incorrectly hold a system
456  * level share mode on a file.
457  */
458
459 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
460                                          void *private_data)
461 {
462         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
463                 close(fsp->fh->fd);
464                 fsp->fh->fd = -1;
465         }
466         return NULL;
467 }
468
469 static int create_aio_child(struct smbd_server_connection *sconn,
470                             struct aio_child_list *children,
471                             size_t map_size,
472                             struct aio_child **presult)
473 {
474         struct aio_child *result;
475         int fdpair[2];
476         int ret;
477
478         fdpair[0] = fdpair[1] = -1;
479
480         result = talloc_zero(children, struct aio_child);
481         if (result == NULL) {
482                 return ENOMEM;
483         }
484
485         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
486                 ret = errno;
487                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
488                 goto fail;
489         }
490
491         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
492
493         result->map = mmap_area_init(result, map_size);
494         if (result->map == NULL) {
495                 ret = errno;
496                 DEBUG(0, ("Could not create mmap area\n"));
497                 goto fail;
498         }
499
500         result->pid = fork();
501         if (result->pid == -1) {
502                 ret = errno;
503                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
504                 goto fail;
505         }
506
507         if (result->pid == 0) {
508                 close(fdpair[0]);
509                 result->sockfd = fdpair[1];
510                 files_forall(sconn, close_fsp_fd, NULL);
511                 aio_child_loop(result->sockfd, result->map);
512         }
513
514         DEBUG(10, ("Child %d created with sockfd %d\n",
515                         result->pid, fdpair[0]));
516
517         result->sockfd = fdpair[0];
518         close(fdpair[1]);
519
520         result->list = children;
521         DLIST_ADD(children->children, result);
522
523         talloc_set_destructor(result, aio_child_destructor);
524
525         *presult = result;
526
527         return 0;
528
529  fail:
530         if (fdpair[0] != -1) close(fdpair[0]);
531         if (fdpair[1] != -1) close(fdpair[1]);
532         TALLOC_FREE(result);
533
534         return ret;
535 }
536
537 static int get_idle_child(struct vfs_handle_struct *handle,
538                           struct aio_child **pchild)
539 {
540         struct aio_child_list *children;
541         struct aio_child *child;
542
543         children = init_aio_children(handle);
544         if (children == NULL) {
545                 return ENOMEM;
546         }
547
548         for (child = children->children; child != NULL; child = child->next) {
549                 if (!child->busy) {
550                         break;
551                 }
552         }
553
554         if (child == NULL) {
555                 int ret;
556
557                 DEBUG(10, ("no idle child found, creating new one\n"));
558
559                 ret = create_aio_child(handle->conn->sconn, children,
560                                           128*1024, &child);
561                 if (ret != 0) {
562                         DEBUG(10, ("create_aio_child failed: %s\n",
563                                    strerror(errno)));
564                         return ret;
565                 }
566         }
567
568         child->dont_delete = true;
569         child->busy = true;
570
571         *pchild = child;
572         return 0;
573 }
574
575 struct aio_fork_pread_state {
576         struct aio_child *child;
577         ssize_t ret;
578         int err;
579 };
580
581 static void aio_fork_pread_done(struct tevent_req *subreq);
582
583 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
584                                               TALLOC_CTX *mem_ctx,
585                                               struct tevent_context *ev,
586                                               struct files_struct *fsp,
587                                               void *data,
588                                               size_t n, off_t offset)
589 {
590         struct tevent_req *req, *subreq;
591         struct aio_fork_pread_state *state;
592         struct rw_cmd cmd;
593         ssize_t written;
594         int err;
595         struct aio_fork_config *config;
596
597         SMB_VFS_HANDLE_GET_DATA(handle, config,
598                                 struct aio_fork_config,
599                                 return NULL);
600
601         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
602         if (req == NULL) {
603                 return NULL;
604         }
605
606         if (n > 128*1024) {
607                 /* TODO: support variable buffers */
608                 tevent_req_error(req, EINVAL);
609                 return tevent_req_post(req, ev);
610         }
611
612         err = get_idle_child(handle, &state->child);
613         if (err != 0) {
614                 tevent_req_error(req, err);
615                 return tevent_req_post(req, ev);
616         }
617
618         ZERO_STRUCT(cmd);
619         cmd.n = n;
620         cmd.offset = offset;
621         cmd.cmd = READ_CMD;
622         cmd.erratic_testing_mode = config->erratic_testing_mode;
623
624         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
625                    (int)state->child->pid));
626
627         /*
628          * Not making this async. We're writing into an empty unix
629          * domain socket. This should never block.
630          */
631         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
632                            fsp->fh->fd);
633         if (written == -1) {
634                 err = errno;
635
636                 TALLOC_FREE(state->child);
637
638                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
639                 tevent_req_error(req, err);
640                 return tevent_req_post(req, ev);
641         }
642
643         subreq = read_packet_send(state, ev, state->child->sockfd,
644                                   sizeof(struct rw_ret), NULL, NULL);
645         if (tevent_req_nomem(subreq, req)) {
646                 TALLOC_FREE(state->child); /* we sent sth down */
647                 return tevent_req_post(req, ev);
648         }
649         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
650         return req;
651 }
652
653 static void aio_fork_pread_done(struct tevent_req *subreq)
654 {
655         struct tevent_req *req = tevent_req_callback_data(
656                 subreq, struct tevent_req);
657         struct aio_fork_pread_state *state = tevent_req_data(
658                 req, struct aio_fork_pread_state);
659         ssize_t nread;
660         uint8_t *buf;
661         int err;
662         struct rw_ret *retbuf;
663
664         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
665         TALLOC_FREE(subreq);
666         if (nread == -1) {
667                 TALLOC_FREE(state->child);
668                 tevent_req_error(req, err);
669                 return;
670         }
671
672         state->child->busy = false;
673
674         retbuf = (struct rw_ret *)buf;
675         state->ret = retbuf->size;
676         state->err = retbuf->ret_errno;
677         tevent_req_done(req);
678 }
679
680 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
681 {
682         struct aio_fork_pread_state *state = tevent_req_data(
683                 req, struct aio_fork_pread_state);
684
685         if (tevent_req_is_unix_error(req, err)) {
686                 return -1;
687         }
688         if (state->ret == -1) {
689                 *err = state->err;
690         }
691         return state->ret;
692 }
693
694 struct aio_fork_pwrite_state {
695         struct aio_child *child;
696         ssize_t ret;
697         int err;
698 };
699
700 static void aio_fork_pwrite_done(struct tevent_req *subreq);
701
702 static struct tevent_req *aio_fork_pwrite_send(
703         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
704         struct tevent_context *ev, struct files_struct *fsp,
705         const void *data, size_t n, off_t offset)
706 {
707         struct tevent_req *req, *subreq;
708         struct aio_fork_pwrite_state *state;
709         struct rw_cmd cmd;
710         ssize_t written;
711         int err;
712         struct aio_fork_config *config;
713         SMB_VFS_HANDLE_GET_DATA(handle, config,
714                                 struct aio_fork_config,
715                                 return NULL);
716
717         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
718         if (req == NULL) {
719                 return NULL;
720         }
721
722         if (n > 128*1024) {
723                 /* TODO: support variable buffers */
724                 tevent_req_error(req, EINVAL);
725                 return tevent_req_post(req, ev);
726         }
727
728         err = get_idle_child(handle, &state->child);
729         if (err != 0) {
730                 tevent_req_error(req, err);
731                 return tevent_req_post(req, ev);
732         }
733
734         ZERO_STRUCT(cmd);
735         cmd.n = n;
736         cmd.offset = offset;
737         cmd.cmd = WRITE_CMD;
738         cmd.erratic_testing_mode = config->erratic_testing_mode;
739
740         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
741                    (int)state->child->pid));
742
743         /*
744          * Not making this async. We're writing into an empty unix
745          * domain socket. This should never block.
746          */
747         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
748                            fsp->fh->fd);
749         if (written == -1) {
750                 err = errno;
751
752                 TALLOC_FREE(state->child);
753
754                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
755                 tevent_req_error(req, err);
756                 return tevent_req_post(req, ev);
757         }
758
759         subreq = read_packet_send(state, ev, state->child->sockfd,
760                                   sizeof(struct rw_ret), NULL, NULL);
761         if (tevent_req_nomem(subreq, req)) {
762                 TALLOC_FREE(state->child); /* we sent sth down */
763                 return tevent_req_post(req, ev);
764         }
765         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
766         return req;
767 }
768
769 static void aio_fork_pwrite_done(struct tevent_req *subreq)
770 {
771         struct tevent_req *req = tevent_req_callback_data(
772                 subreq, struct tevent_req);
773         struct aio_fork_pwrite_state *state = tevent_req_data(
774                 req, struct aio_fork_pwrite_state);
775         ssize_t nread;
776         uint8_t *buf;
777         int err;
778         struct rw_ret *retbuf;
779
780         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
781         TALLOC_FREE(subreq);
782         if (nread == -1) {
783                 TALLOC_FREE(state->child);
784                 tevent_req_error(req, err);
785                 return;
786         }
787
788         state->child->busy = false;
789
790         retbuf = (struct rw_ret *)buf;
791         state->ret = retbuf->size;
792         state->err = retbuf->ret_errno;
793         tevent_req_done(req);
794 }
795
796 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
797 {
798         struct aio_fork_pwrite_state *state = tevent_req_data(
799                 req, struct aio_fork_pwrite_state);
800
801         if (tevent_req_is_unix_error(req, err)) {
802                 return -1;
803         }
804         if (state->ret == -1) {
805                 *err = state->err;
806         }
807         return state->ret;
808 }
809
810 struct aio_fork_fsync_state {
811         struct aio_child *child;
812         ssize_t ret;
813         int err;
814 };
815
816 static void aio_fork_fsync_done(struct tevent_req *subreq);
817
818 static struct tevent_req *aio_fork_fsync_send(
819         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
820         struct tevent_context *ev, struct files_struct *fsp)
821 {
822         struct tevent_req *req, *subreq;
823         struct aio_fork_fsync_state *state;
824         struct rw_cmd cmd;
825         ssize_t written;
826         int err;
827         struct aio_fork_config *config;
828
829         SMB_VFS_HANDLE_GET_DATA(handle, config,
830                                 struct aio_fork_config,
831                                 return NULL);
832
833         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
834         if (req == NULL) {
835                 return NULL;
836         }
837
838         err = get_idle_child(handle, &state->child);
839         if (err != 0) {
840                 tevent_req_error(req, err);
841                 return tevent_req_post(req, ev);
842         }
843
844         ZERO_STRUCT(cmd);
845         cmd.cmd = FSYNC_CMD;
846         cmd.erratic_testing_mode = config->erratic_testing_mode;
847
848         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
849                    (int)state->child->pid));
850
851         /*
852          * Not making this async. We're writing into an empty unix
853          * domain socket. This should never block.
854          */
855         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
856                            fsp->fh->fd);
857         if (written == -1) {
858                 err = errno;
859
860                 TALLOC_FREE(state->child);
861
862                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
863                 tevent_req_error(req, err);
864                 return tevent_req_post(req, ev);
865         }
866
867         subreq = read_packet_send(state, ev, state->child->sockfd,
868                                   sizeof(struct rw_ret), NULL, NULL);
869         if (tevent_req_nomem(subreq, req)) {
870                 TALLOC_FREE(state->child); /* we sent sth down */
871                 return tevent_req_post(req, ev);
872         }
873         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
874         return req;
875 }
876
877 static void aio_fork_fsync_done(struct tevent_req *subreq)
878 {
879         struct tevent_req *req = tevent_req_callback_data(
880                 subreq, struct tevent_req);
881         struct aio_fork_fsync_state *state = tevent_req_data(
882                 req, struct aio_fork_fsync_state);
883         ssize_t nread;
884         uint8_t *buf;
885         int err;
886         struct rw_ret *retbuf;
887
888         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
889         TALLOC_FREE(subreq);
890         if (nread == -1) {
891                 TALLOC_FREE(state->child);
892                 tevent_req_error(req, err);
893                 return;
894         }
895
896         state->child->busy = false;
897
898         retbuf = (struct rw_ret *)buf;
899         state->ret = retbuf->size;
900         state->err = retbuf->ret_errno;
901         tevent_req_done(req);
902 }
903
904 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
905 {
906         struct aio_fork_fsync_state *state = tevent_req_data(
907                 req, struct aio_fork_fsync_state);
908
909         if (tevent_req_is_unix_error(req, err)) {
910                 return -1;
911         }
912         if (state->ret == -1) {
913                 *err = state->err;
914         }
915         return state->ret;
916 }
917
918 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
919                             const char *user)
920 {
921         int ret;
922         struct aio_fork_config *config;
923         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
924
925         if (ret < 0) {
926                 return ret;
927         }
928
929         config = talloc_zero(handle->conn, struct aio_fork_config);
930         if (!config) {
931                 SMB_VFS_NEXT_DISCONNECT(handle);
932                 DEBUG(0, ("talloc_zero() failed\n"));
933                 return -1;
934         }
935
936         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
937                                                     "erratic_testing_mode", false);
938         
939         SMB_VFS_HANDLE_SET_DATA(handle, config,
940                                 NULL, struct aio_fork_config,
941                                 return -1);
942
943         /*********************************************************************
944          * How many threads to initialize ?
945          * 100 per process seems insane as a default until you realize that
946          * (a) Threads terminate after 1 second when idle.
947          * (b) Throttling is done in SMB2 via the crediting algorithm.
948          * (c) SMB1 clients are limited to max_mux (50) outstanding
949          *     requests and Windows clients don't use this anyway.
950          * Essentially we want this to be unlimited unless smb.conf
951          * says different.
952          *********************************************************************/
953         aio_pending_size = 100;
954         return 0;
955 }
956
957 static struct vfs_fn_pointers vfs_aio_fork_fns = {
958         .connect_fn = aio_fork_connect,
959         .pread_send_fn = aio_fork_pread_send,
960         .pread_recv_fn = aio_fork_pread_recv,
961         .pwrite_send_fn = aio_fork_pwrite_send,
962         .pwrite_recv_fn = aio_fork_pwrite_recv,
963         .fsync_send_fn = aio_fork_fsync_send,
964         .fsync_recv_fn = aio_fork_fsync_recv,
965 };
966
967 NTSTATUS vfs_aio_fork_init(void);
968 NTSTATUS vfs_aio_fork_init(void)
969 {
970         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
971                                 "aio_fork", &vfs_aio_fork_fns);
972 }