lib: give global_contexts.c its own header file
[samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29 #include "lib/util/sys_rw.h"
30 #include "lib/util/sys_rw_data.h"
31 #include "lib/util/msghdr.h"
32 #include "smbprofile.h"
33 #include "lib/global_contexts.h"
34
35 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
36 # error Can not pass file descriptors
37 #endif
38
39 #undef recvmsg
40
41 #ifndef MAP_FILE
42 #define MAP_FILE 0
43 #endif
44
45 struct aio_child_list;
46
47 struct aio_fork_config {
48         bool erratic_testing_mode;
49         struct aio_child_list *children;
50 };
51
52 struct mmap_area {
53         size_t size;
54         void *ptr;
55 };
56
57 static int mmap_area_destructor(struct mmap_area *area)
58 {
59         munmap(discard_const(area->ptr), area->size);
60         return 0;
61 }
62
63 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
64 {
65         struct mmap_area *result;
66         int fd;
67
68         result = talloc(mem_ctx, struct mmap_area);
69         if (result == NULL) {
70                 DEBUG(0, ("talloc failed\n"));
71                 goto fail;
72         }
73
74         fd = open("/dev/zero", O_RDWR);
75         if (fd == -1) {
76                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
77                           strerror(errno)));
78                 goto fail;
79         }
80
81         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
82                            MAP_SHARED|MAP_FILE, fd, 0);
83         close(fd);
84         if (result->ptr == MAP_FAILED) {
85                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
86                 goto fail;
87         }
88
89         result->size = size;
90         talloc_set_destructor(result, mmap_area_destructor);
91
92         return result;
93
94 fail:
95         TALLOC_FREE(result);
96         return NULL;
97 }
98
99 enum cmd_type {
100         READ_CMD,
101         WRITE_CMD,
102         FSYNC_CMD
103 };
104
105 static const char *cmd_type_str(enum cmd_type cmd)
106 {
107         const char *result;
108
109         switch (cmd) {
110         case READ_CMD:
111                 result = "READ";
112                 break;
113         case WRITE_CMD:
114                 result = "WRITE";
115                 break;
116         case FSYNC_CMD:
117                 result = "FSYNC";
118                 break;
119         default:
120                 result = "<UNKNOWN>";
121                 break;
122         }
123         return result;
124 }
125
126 struct rw_cmd {
127         size_t n;
128         off_t offset;
129         enum cmd_type cmd;
130         bool erratic_testing_mode;
131 };
132
133 struct rw_ret {
134         ssize_t size;
135         int ret_errno;
136         uint64_t duration;
137 };
138
139 struct aio_child_list;
140
141 struct aio_child {
142         struct aio_child *prev, *next;
143         struct aio_child_list *list;
144         pid_t pid;
145         int sockfd;
146         struct mmap_area *map;
147         bool dont_delete;       /* Marked as in use since last cleanup */
148         bool busy;
149 };
150
151 struct aio_child_list {
152         struct aio_child *children;
153         struct tevent_timer *cleanup_event;
154 };
155
156 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
157 {
158         struct iovec iov[1];
159         struct msghdr msg = { .msg_iov = iov, .msg_iovlen = 1 };
160         ssize_t n;
161         size_t bufsize = msghdr_prep_recv_fds(NULL, NULL, 0, 1);
162         uint8_t buf[bufsize];
163
164         msghdr_prep_recv_fds(&msg, buf, bufsize, 1);
165
166         iov[0].iov_base = (void *)ptr;
167         iov[0].iov_len = nbytes;
168
169         do {
170                 n = recvmsg(fd, &msg, 0);
171         } while ((n == -1) && (errno == EINTR));
172
173         if (n <= 0) {
174                 return n;
175         }
176
177         {
178                 size_t num_fds = msghdr_extract_fds(&msg, NULL, 0);
179                 int fds[num_fds];
180
181                 msghdr_extract_fds(&msg, fds, num_fds);
182
183                 if (num_fds != 1) {
184                         size_t i;
185
186                         for (i=0; i<num_fds; i++) {
187                                 close(fds[i]);
188                         }
189
190                         *recvfd = -1;
191                         return n;
192                 }
193
194                 *recvfd = fds[0];
195         }
196
197         return(n);
198 }
199
200 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
201 {
202         struct msghdr msg = {0};
203         size_t bufsize = msghdr_prep_fds(NULL, NULL, 0, &sendfd, 1);
204         uint8_t buf[bufsize];
205         struct iovec iov;
206         ssize_t sent;
207
208         msghdr_prep_fds(&msg, buf, bufsize, &sendfd, 1);
209
210         iov.iov_base = (void *)ptr;
211         iov.iov_len = nbytes;
212         msg.msg_iov = &iov;
213         msg.msg_iovlen = 1;
214
215         do {
216                 sent = sendmsg(fd, &msg, 0);
217         } while ((sent == -1) && (errno == EINTR));
218
219         return sent;
220 }
221
222 static void aio_child_cleanup(struct tevent_context *event_ctx,
223                               struct tevent_timer *te,
224                               struct timeval now,
225                               void *private_data)
226 {
227         struct aio_child_list *list = talloc_get_type_abort(
228                 private_data, struct aio_child_list);
229         struct aio_child *child, *next;
230
231         TALLOC_FREE(list->cleanup_event);
232
233         for (child = list->children; child != NULL; child = next) {
234                 next = child->next;
235
236                 if (child->busy) {
237                         DEBUG(10, ("child %d currently active\n",
238                                    (int)child->pid));
239                         continue;
240                 }
241
242                 if (child->dont_delete) {
243                         DEBUG(10, ("Child %d was active since last cleanup\n",
244                                    (int)child->pid));
245                         child->dont_delete = false;
246                         continue;
247                 }
248
249                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
250                            "deleting\n", (int)child->pid));
251
252                 TALLOC_FREE(child);
253                 child = next;
254         }
255
256         if (list->children != NULL) {
257                 /*
258                  * Re-schedule the next cleanup round
259                  */
260                 list->cleanup_event = tevent_add_timer(global_event_context(), list,
261                                                       timeval_add(&now, 30, 0),
262                                                       aio_child_cleanup, list);
263
264         }
265 }
266
267 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
268 {
269         struct aio_fork_config *config;
270         struct aio_child_list *children;
271
272         SMB_VFS_HANDLE_GET_DATA(handle, config, struct aio_fork_config,
273                                 return NULL);
274
275         if (config->children == NULL) {
276                 config->children = talloc_zero(config, struct aio_child_list);
277                 if (config->children == NULL) {
278                         return NULL;
279                 }
280         }
281         children = config->children;
282
283         /*
284          * Regardless of whether the child_list had been around or not, make
285          * sure that we have a cleanup timed event. This timed event will
286          * delete itself when it finds that no children are around anymore.
287          */
288
289         if (children->cleanup_event == NULL) {
290                 children->cleanup_event =
291                         tevent_add_timer(global_event_context(), children,
292                                          timeval_current_ofs(30, 0),
293                                          aio_child_cleanup, children);
294                 if (children->cleanup_event == NULL) {
295                         TALLOC_FREE(config->children);
296                         return NULL;
297                 }
298         }
299
300         return children;
301 }
302
303 static void aio_child_loop(int sockfd, struct mmap_area *map)
304 {
305         while (true) {
306                 int fd = -1;
307                 ssize_t ret;
308                 struct rw_cmd cmd_struct;
309                 struct rw_ret ret_struct;
310                 struct timespec start, end;
311
312                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
313                 if (ret != sizeof(cmd_struct)) {
314                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
315                                    strerror(errno)));
316                         exit(1);
317                 }
318
319                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
320                            cmd_type_str(cmd_struct.cmd),
321                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
322
323                 if (cmd_struct.erratic_testing_mode) {
324                         /*
325                          * For developer testing, we want erratic behaviour for
326                          * async I/O times
327                          */
328                         uint8_t randval;
329                         unsigned msecs;
330                         /*
331                          * use generate_random_buffer, we just forked from a
332                          * common parent state
333                          */
334                         generate_random_buffer(&randval, sizeof(randval));
335                         msecs = (randval%20)+1;
336                         DEBUG(10, ("delaying for %u msecs\n", msecs));
337                         smb_msleep(msecs);
338                 }
339
340                 ZERO_STRUCT(ret_struct);
341
342                 PROFILE_TIMESTAMP(&start);
343
344                 switch (cmd_struct.cmd) {
345                 case READ_CMD:
346                         ret_struct.size = sys_pread_full(
347                                 fd, discard_const(map->ptr), cmd_struct.n,
348                                 cmd_struct.offset);
349 #if 0
350 /* This breaks "make test" when run with aio_fork module. */
351 #ifdef DEVELOPER
352                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
353 #endif
354 #endif
355                         break;
356                 case WRITE_CMD:
357                         ret_struct.size = sys_pwrite_full(
358                                 fd, discard_const(map->ptr), cmd_struct.n,
359                                 cmd_struct.offset);
360                         break;
361                 case FSYNC_CMD:
362                         ret_struct.size = fsync(fd);
363                         break;
364                 default:
365                         ret_struct.size = -1;
366                         errno = EINVAL;
367                 }
368
369                 PROFILE_TIMESTAMP(&end);
370                 ret_struct.duration = nsec_time_diff(&end, &start);
371                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
372                            (int)ret_struct.size));
373
374                 if (ret_struct.size == -1) {
375                         ret_struct.ret_errno = errno;
376                 }
377
378                 /*
379                  * Close the fd before telling our parent we're done. The
380                  * parent might close and re-open the file very quickly, and
381                  * with system-level share modes (GPFS) we would get an
382                  * unjustified SHARING_VIOLATION.
383                  */
384                 close(fd);
385
386                 ret = write_data(sockfd, (char *)&ret_struct,
387                                  sizeof(ret_struct));
388                 if (ret != sizeof(ret_struct)) {
389                         DEBUG(10, ("could not write ret_struct: %s\n",
390                                    strerror(errno)));
391                         exit(2);
392                 }
393         }
394 }
395
396 static int aio_child_destructor(struct aio_child *child)
397 {
398         char c=0;
399
400         SMB_ASSERT(!child->busy);
401
402         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
403                    (int)child->pid, child->sockfd));
404
405         /*
406          * closing the sockfd makes the child not return from recvmsg() on RHEL
407          * 5.5 so instead force the child to exit by writing bad data to it
408          */
409         sys_write_v(child->sockfd, &c, sizeof(c));
410         close(child->sockfd);
411         DLIST_REMOVE(child->list->children, child);
412         return 0;
413 }
414
415 /*
416  * We have to close all fd's in open files, we might incorrectly hold a system
417  * level share mode on a file.
418  */
419
420 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
421                                          void *private_data)
422 {
423         if ((fsp->fh != NULL) && (fsp_get_pathref_fd(fsp) != -1)) {
424                 close(fsp_get_pathref_fd(fsp));
425                 fsp_set_fd(fsp, -1);
426         }
427         return NULL;
428 }
429
430 static int create_aio_child(struct smbd_server_connection *sconn,
431                             struct aio_child_list *children,
432                             size_t map_size,
433                             struct aio_child **presult)
434 {
435         struct aio_child *result;
436         int fdpair[2];
437         int ret;
438
439         fdpair[0] = fdpair[1] = -1;
440
441         result = talloc_zero(children, struct aio_child);
442         if (result == NULL) {
443                 return ENOMEM;
444         }
445
446         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
447                 ret = errno;
448                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
449                 goto fail;
450         }
451
452         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
453
454         result->map = mmap_area_init(result, map_size);
455         if (result->map == NULL) {
456                 ret = errno;
457                 DEBUG(0, ("Could not create mmap area\n"));
458                 goto fail;
459         }
460
461         result->pid = fork();
462         if (result->pid == -1) {
463                 ret = errno;
464                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
465                 goto fail;
466         }
467
468         if (result->pid == 0) {
469                 close(fdpair[0]);
470                 result->sockfd = fdpair[1];
471                 files_forall(sconn, close_fsp_fd, NULL);
472                 aio_child_loop(result->sockfd, result->map);
473         }
474
475         DEBUG(10, ("Child %d created with sockfd %d\n",
476                    (int)result->pid, fdpair[0]));
477
478         result->sockfd = fdpair[0];
479         close(fdpair[1]);
480
481         result->list = children;
482         DLIST_ADD(children->children, result);
483
484         talloc_set_destructor(result, aio_child_destructor);
485
486         *presult = result;
487
488         return 0;
489
490  fail:
491         if (fdpair[0] != -1) close(fdpair[0]);
492         if (fdpair[1] != -1) close(fdpair[1]);
493         TALLOC_FREE(result);
494
495         return ret;
496 }
497
498 static int get_idle_child(struct vfs_handle_struct *handle,
499                           struct aio_child **pchild)
500 {
501         struct aio_child_list *children;
502         struct aio_child *child;
503
504         children = init_aio_children(handle);
505         if (children == NULL) {
506                 return ENOMEM;
507         }
508
509         for (child = children->children; child != NULL; child = child->next) {
510                 if (!child->busy) {
511                         break;
512                 }
513         }
514
515         if (child == NULL) {
516                 int ret;
517
518                 DEBUG(10, ("no idle child found, creating new one\n"));
519
520                 ret = create_aio_child(handle->conn->sconn, children,
521                                           128*1024, &child);
522                 if (ret != 0) {
523                         DEBUG(10, ("create_aio_child failed: %s\n",
524                                    strerror(errno)));
525                         return ret;
526                 }
527         }
528
529         child->dont_delete = true;
530         child->busy = true;
531
532         *pchild = child;
533         return 0;
534 }
535
536 struct aio_fork_pread_state {
537         struct aio_child *child;
538         size_t n;
539         void *data;
540         ssize_t ret;
541         struct vfs_aio_state vfs_aio_state;
542 };
543
544 static void aio_fork_pread_done(struct tevent_req *subreq);
545
546 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
547                                               TALLOC_CTX *mem_ctx,
548                                               struct tevent_context *ev,
549                                               struct files_struct *fsp,
550                                               void *data,
551                                               size_t n, off_t offset)
552 {
553         struct tevent_req *req, *subreq;
554         struct aio_fork_pread_state *state;
555         struct rw_cmd cmd;
556         ssize_t written;
557         int err;
558         struct aio_fork_config *config;
559
560         SMB_VFS_HANDLE_GET_DATA(handle, config,
561                                 struct aio_fork_config,
562                                 return NULL);
563
564         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
565         if (req == NULL) {
566                 return NULL;
567         }
568         state->n = n;
569         state->data = data;
570
571         if (n > 128*1024) {
572                 /* TODO: support variable buffers */
573                 tevent_req_error(req, EINVAL);
574                 return tevent_req_post(req, ev);
575         }
576
577         err = get_idle_child(handle, &state->child);
578         if (err != 0) {
579                 tevent_req_error(req, err);
580                 return tevent_req_post(req, ev);
581         }
582
583         ZERO_STRUCT(cmd);
584         cmd.n = n;
585         cmd.offset = offset;
586         cmd.cmd = READ_CMD;
587         cmd.erratic_testing_mode = config->erratic_testing_mode;
588
589         DEBUG(10, ("sending fd %d to child %d\n", fsp_get_io_fd(fsp),
590                    (int)state->child->pid));
591
592         /*
593          * Not making this async. We're writing into an empty unix
594          * domain socket. This should never block.
595          */
596         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
597                            fsp_get_io_fd(fsp));
598         if (written == -1) {
599                 err = errno;
600
601                 TALLOC_FREE(state->child);
602
603                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
604                 tevent_req_error(req, err);
605                 return tevent_req_post(req, ev);
606         }
607
608         subreq = read_packet_send(state, ev, state->child->sockfd,
609                                   sizeof(struct rw_ret), NULL, NULL);
610         if (tevent_req_nomem(subreq, req)) {
611                 TALLOC_FREE(state->child); /* we sent sth down */
612                 return tevent_req_post(req, ev);
613         }
614         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
615         return req;
616 }
617
618 static void aio_fork_pread_done(struct tevent_req *subreq)
619 {
620         struct tevent_req *req = tevent_req_callback_data(
621                 subreq, struct tevent_req);
622         struct aio_fork_pread_state *state = tevent_req_data(
623                 req, struct aio_fork_pread_state);
624         ssize_t nread;
625         uint8_t *buf;
626         int err;
627         struct rw_ret *retbuf;
628
629         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
630         TALLOC_FREE(subreq);
631         if (nread == -1) {
632                 TALLOC_FREE(state->child);
633                 tevent_req_error(req, err);
634                 return;
635         }
636
637         retbuf = (struct rw_ret *)buf;
638         state->ret = retbuf->size;
639         state->vfs_aio_state.error = retbuf->ret_errno;
640         state->vfs_aio_state.duration = retbuf->duration;
641
642         if ((size_t)state->ret > state->n) {
643                 tevent_req_error(req, EIO);
644                 state->child->busy = false;
645                 return;
646         }
647         memcpy(state->data, state->child->map->ptr, state->ret);
648
649         state->child->busy = false;
650
651         tevent_req_done(req);
652 }
653
654 static ssize_t aio_fork_pread_recv(struct tevent_req *req,
655                                    struct vfs_aio_state *vfs_aio_state)
656 {
657         struct aio_fork_pread_state *state = tevent_req_data(
658                 req, struct aio_fork_pread_state);
659
660         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
661                 return -1;
662         }
663         *vfs_aio_state = state->vfs_aio_state;
664         return state->ret;
665 }
666
667 struct aio_fork_pwrite_state {
668         struct aio_child *child;
669         ssize_t ret;
670         struct vfs_aio_state vfs_aio_state;
671 };
672
673 static void aio_fork_pwrite_done(struct tevent_req *subreq);
674
675 static struct tevent_req *aio_fork_pwrite_send(
676         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
677         struct tevent_context *ev, struct files_struct *fsp,
678         const void *data, size_t n, off_t offset)
679 {
680         struct tevent_req *req, *subreq;
681         struct aio_fork_pwrite_state *state;
682         struct rw_cmd cmd;
683         ssize_t written;
684         int err;
685         struct aio_fork_config *config;
686         SMB_VFS_HANDLE_GET_DATA(handle, config,
687                                 struct aio_fork_config,
688                                 return NULL);
689
690         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
691         if (req == NULL) {
692                 return NULL;
693         }
694
695         if (n > 128*1024) {
696                 /* TODO: support variable buffers */
697                 tevent_req_error(req, EINVAL);
698                 return tevent_req_post(req, ev);
699         }
700
701         err = get_idle_child(handle, &state->child);
702         if (err != 0) {
703                 tevent_req_error(req, err);
704                 return tevent_req_post(req, ev);
705         }
706
707         memcpy(state->child->map->ptr, data, n);
708
709         ZERO_STRUCT(cmd);
710         cmd.n = n;
711         cmd.offset = offset;
712         cmd.cmd = WRITE_CMD;
713         cmd.erratic_testing_mode = config->erratic_testing_mode;
714
715         DEBUG(10, ("sending fd %d to child %d\n", fsp_get_io_fd(fsp),
716                    (int)state->child->pid));
717
718         /*
719          * Not making this async. We're writing into an empty unix
720          * domain socket. This should never block.
721          */
722         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
723                            fsp_get_io_fd(fsp));
724         if (written == -1) {
725                 err = errno;
726
727                 TALLOC_FREE(state->child);
728
729                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
730                 tevent_req_error(req, err);
731                 return tevent_req_post(req, ev);
732         }
733
734         subreq = read_packet_send(state, ev, state->child->sockfd,
735                                   sizeof(struct rw_ret), NULL, NULL);
736         if (tevent_req_nomem(subreq, req)) {
737                 TALLOC_FREE(state->child); /* we sent sth down */
738                 return tevent_req_post(req, ev);
739         }
740         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
741         return req;
742 }
743
744 static void aio_fork_pwrite_done(struct tevent_req *subreq)
745 {
746         struct tevent_req *req = tevent_req_callback_data(
747                 subreq, struct tevent_req);
748         struct aio_fork_pwrite_state *state = tevent_req_data(
749                 req, struct aio_fork_pwrite_state);
750         ssize_t nread;
751         uint8_t *buf;
752         int err;
753         struct rw_ret *retbuf;
754
755         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
756         TALLOC_FREE(subreq);
757         if (nread == -1) {
758                 TALLOC_FREE(state->child);
759                 tevent_req_error(req, err);
760                 return;
761         }
762
763         state->child->busy = false;
764
765         retbuf = (struct rw_ret *)buf;
766         state->ret = retbuf->size;
767         state->vfs_aio_state.error = retbuf->ret_errno;
768         state->vfs_aio_state.duration = retbuf->duration;
769         tevent_req_done(req);
770 }
771
772 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req,
773                                     struct vfs_aio_state *vfs_aio_state)
774 {
775         struct aio_fork_pwrite_state *state = tevent_req_data(
776                 req, struct aio_fork_pwrite_state);
777
778         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
779                 return -1;
780         }
781         *vfs_aio_state = state->vfs_aio_state;
782         return state->ret;
783 }
784
785 struct aio_fork_fsync_state {
786         struct aio_child *child;
787         ssize_t ret;
788         struct vfs_aio_state vfs_aio_state;
789 };
790
791 static void aio_fork_fsync_done(struct tevent_req *subreq);
792
793 static struct tevent_req *aio_fork_fsync_send(
794         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
795         struct tevent_context *ev, struct files_struct *fsp)
796 {
797         struct tevent_req *req, *subreq;
798         struct aio_fork_fsync_state *state;
799         struct rw_cmd cmd;
800         ssize_t written;
801         int err;
802         struct aio_fork_config *config;
803
804         SMB_VFS_HANDLE_GET_DATA(handle, config,
805                                 struct aio_fork_config,
806                                 return NULL);
807
808         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
809         if (req == NULL) {
810                 return NULL;
811         }
812
813         err = get_idle_child(handle, &state->child);
814         if (err != 0) {
815                 tevent_req_error(req, err);
816                 return tevent_req_post(req, ev);
817         }
818
819         ZERO_STRUCT(cmd);
820         cmd.cmd = FSYNC_CMD;
821         cmd.erratic_testing_mode = config->erratic_testing_mode;
822
823         DEBUG(10, ("sending fd %d to child %d\n", fsp_get_io_fd(fsp),
824                    (int)state->child->pid));
825
826         /*
827          * Not making this async. We're writing into an empty unix
828          * domain socket. This should never block.
829          */
830         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
831                            fsp_get_io_fd(fsp));
832         if (written == -1) {
833                 err = errno;
834
835                 TALLOC_FREE(state->child);
836
837                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
838                 tevent_req_error(req, err);
839                 return tevent_req_post(req, ev);
840         }
841
842         subreq = read_packet_send(state, ev, state->child->sockfd,
843                                   sizeof(struct rw_ret), NULL, NULL);
844         if (tevent_req_nomem(subreq, req)) {
845                 TALLOC_FREE(state->child); /* we sent sth down */
846                 return tevent_req_post(req, ev);
847         }
848         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
849         return req;
850 }
851
852 static void aio_fork_fsync_done(struct tevent_req *subreq)
853 {
854         struct tevent_req *req = tevent_req_callback_data(
855                 subreq, struct tevent_req);
856         struct aio_fork_fsync_state *state = tevent_req_data(
857                 req, struct aio_fork_fsync_state);
858         ssize_t nread;
859         uint8_t *buf;
860         int err;
861         struct rw_ret *retbuf;
862
863         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
864         TALLOC_FREE(subreq);
865         if (nread == -1) {
866                 TALLOC_FREE(state->child);
867                 tevent_req_error(req, err);
868                 return;
869         }
870
871         state->child->busy = false;
872
873         retbuf = (struct rw_ret *)buf;
874         state->ret = retbuf->size;
875         state->vfs_aio_state.error = retbuf->ret_errno;
876         state->vfs_aio_state.duration = retbuf->duration;
877         tevent_req_done(req);
878 }
879
880 static int aio_fork_fsync_recv(struct tevent_req *req,
881                                struct vfs_aio_state *vfs_aio_state)
882 {
883         struct aio_fork_fsync_state *state = tevent_req_data(
884                 req, struct aio_fork_fsync_state);
885
886         if (tevent_req_is_unix_error(req, &vfs_aio_state->error)) {
887                 return -1;
888         }
889         *vfs_aio_state = state->vfs_aio_state;
890         return state->ret;
891 }
892
893 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
894                             const char *user)
895 {
896         int ret;
897         struct aio_fork_config *config;
898         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
899
900         if (ret < 0) {
901                 return ret;
902         }
903
904         config = talloc_zero(handle->conn, struct aio_fork_config);
905         if (!config) {
906                 SMB_VFS_NEXT_DISCONNECT(handle);
907                 DEBUG(0, ("talloc_zero() failed\n"));
908                 return -1;
909         }
910
911         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
912                                                     "erratic_testing_mode", false);
913         
914         SMB_VFS_HANDLE_SET_DATA(handle, config,
915                                 NULL, struct aio_fork_config,
916                                 return -1);
917
918         return 0;
919 }
920
921 static struct vfs_fn_pointers vfs_aio_fork_fns = {
922         .connect_fn = aio_fork_connect,
923         .pread_send_fn = aio_fork_pread_send,
924         .pread_recv_fn = aio_fork_pread_recv,
925         .pwrite_send_fn = aio_fork_pwrite_send,
926         .pwrite_recv_fn = aio_fork_pwrite_recv,
927         .fsync_send_fn = aio_fork_fsync_send,
928         .fsync_recv_fn = aio_fork_fsync_recv,
929 };
930
931 static_decl_vfs;
932 NTSTATUS vfs_aio_fork_init(TALLOC_CTX *ctx)
933 {
934         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
935                                 "aio_fork", &vfs_aio_fork_fns);
936 }