s3: lib, s3: modules: Fix compilation on Solaris.
[samba.git] / source3 / modules / vfs_aio_fork.c
1 /*
2  * Simulate the Posix AIO using mmap/fork
3  *
4  * Copyright (C) Volker Lendecke 2008
5  * Copyright (C) Jeremy Allison 2010
6  *
7  * This program is free software; you can redistribute it and/or modify
8  * it under the terms of the GNU General Public License as published by
9  * the Free Software Foundation; either version 2 of the License, or
10  * (at your option) any later version.
11  *
12  * This program is distributed in the hope that it will be useful,
13  * but WITHOUT ANY WARRANTY; without even the implied warranty of
14  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  * GNU General Public License for more details.
16  *
17  * You should have received a copy of the GNU General Public License
18  * along with this program; if not, write to the Free Software
19  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
20  */
21
22 #include "includes.h"
23 #include "system/filesys.h"
24 #include "system/shmem.h"
25 #include "smbd/smbd.h"
26 #include "smbd/globals.h"
27 #include "lib/async_req/async_sock.h"
28 #include "lib/util/tevent_unix.h"
29
30 #if !defined(HAVE_STRUCT_MSGHDR_MSG_CONTROL) && !defined(HAVE_STRUCT_MSGHDR_MSG_ACCRIGHTS)
31 # error Can not pass file descriptors
32 #endif
33
34 #undef recvmsg
35
36 #ifndef MAP_FILE
37 #define MAP_FILE 0
38 #endif
39
40 struct aio_fork_config {
41         bool erratic_testing_mode;
42 };
43
44 struct mmap_area {
45         size_t size;
46         volatile void *ptr;
47 };
48
49 static int mmap_area_destructor(struct mmap_area *area)
50 {
51         munmap((void *)area->ptr, area->size);
52         return 0;
53 }
54
55 static struct mmap_area *mmap_area_init(TALLOC_CTX *mem_ctx, size_t size)
56 {
57         struct mmap_area *result;
58         int fd;
59
60         result = talloc(mem_ctx, struct mmap_area);
61         if (result == NULL) {
62                 DEBUG(0, ("talloc failed\n"));
63                 goto fail;
64         }
65
66         fd = open("/dev/zero", O_RDWR);
67         if (fd == -1) {
68                 DEBUG(3, ("open(\"/dev/zero\") failed: %s\n",
69                           strerror(errno)));
70                 goto fail;
71         }
72
73         result->ptr = mmap(NULL, size, PROT_READ|PROT_WRITE,
74                            MAP_SHARED|MAP_FILE, fd, 0);
75         close(fd);
76         if (result->ptr == MAP_FAILED) {
77                 DEBUG(1, ("mmap failed: %s\n", strerror(errno)));
78                 goto fail;
79         }
80
81         result->size = size;
82         talloc_set_destructor(result, mmap_area_destructor);
83
84         return result;
85
86 fail:
87         TALLOC_FREE(result);
88         return NULL;
89 }
90
91 enum cmd_type {
92         READ_CMD,
93         WRITE_CMD,
94         FSYNC_CMD
95 };
96
97 static const char *cmd_type_str(enum cmd_type cmd)
98 {
99         const char *result;
100
101         switch (cmd) {
102         case READ_CMD:
103                 result = "READ";
104                 break;
105         case WRITE_CMD:
106                 result = "WRITE";
107                 break;
108         case FSYNC_CMD:
109                 result = "FSYNC";
110                 break;
111         default:
112                 result = "<UNKNOWN>";
113                 break;
114         }
115         return result;
116 }
117
118 struct rw_cmd {
119         size_t n;
120         off_t offset;
121         enum cmd_type cmd;
122         bool erratic_testing_mode;
123 };
124
125 struct rw_ret {
126         ssize_t size;
127         int ret_errno;
128 };
129
130 struct aio_child_list;
131
132 struct aio_child {
133         struct aio_child *prev, *next;
134         struct aio_child_list *list;
135         pid_t pid;
136         int sockfd;
137         struct mmap_area *map;
138         bool dont_delete;       /* Marked as in use since last cleanup */
139         bool busy;
140 };
141
142 struct aio_child_list {
143         struct aio_child *children;
144         struct tevent_timer *cleanup_event;
145 };
146
147 static void free_aio_children(void **p)
148 {
149         TALLOC_FREE(*p);
150 }
151
152 static ssize_t read_fd(int fd, void *ptr, size_t nbytes, int *recvfd)
153 {
154         struct msghdr msg;
155         struct iovec iov[1];
156         ssize_t n;
157 #ifndef HAVE_STRUCT_MSGHDR_MSG_CONTROL
158         int newfd;
159
160         ZERO_STRUCT(msg);
161         msg.msg_accrights = (caddr_t) &newfd;
162         msg.msg_accrightslen = sizeof(int);
163 #else
164
165         union {
166           struct cmsghdr        cm;
167           char                          control[CMSG_SPACE(sizeof(int))];
168         } control_un;
169         struct cmsghdr  *cmptr;
170
171         ZERO_STRUCT(msg);
172         ZERO_STRUCT(control_un);
173
174         msg.msg_control = control_un.control;
175         msg.msg_controllen = sizeof(control_un.control);
176 #endif
177
178         msg.msg_name = NULL;
179         msg.msg_namelen = 0;
180
181         iov[0].iov_base = (void *)ptr;
182         iov[0].iov_len = nbytes;
183         msg.msg_iov = iov;
184         msg.msg_iovlen = 1;
185
186         if ( (n = recvmsg(fd, &msg, 0)) <= 0) {
187                 return(n);
188         }
189
190 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
191         if ((cmptr = CMSG_FIRSTHDR(&msg)) != NULL
192             && cmptr->cmsg_len == CMSG_LEN(sizeof(int))) {
193                 if (cmptr->cmsg_level != SOL_SOCKET) {
194                         DEBUG(10, ("control level != SOL_SOCKET"));
195                         errno = EINVAL;
196                         return -1;
197                 }
198                 if (cmptr->cmsg_type != SCM_RIGHTS) {
199                         DEBUG(10, ("control type != SCM_RIGHTS"));
200                         errno = EINVAL;
201                         return -1;
202                 }
203                 memcpy(recvfd, CMSG_DATA(cmptr), sizeof(*recvfd));
204         } else {
205                 *recvfd = -1;           /* descriptor was not passed */
206         }
207 #else
208         if (msg.msg_accrightslen == sizeof(int)) {
209                 *recvfd = newfd;
210         }
211         else {
212                 *recvfd = -1;           /* descriptor was not passed */
213         }
214 #endif
215
216         return(n);
217 }
218
219 static ssize_t write_fd(int fd, void *ptr, size_t nbytes, int sendfd)
220 {
221         struct msghdr   msg;
222         struct iovec    iov[1];
223
224 #ifdef  HAVE_STRUCT_MSGHDR_MSG_CONTROL
225         union {
226                 struct cmsghdr  cm;
227                 char control[CMSG_SPACE(sizeof(int))];
228         } control_un;
229         struct cmsghdr  *cmptr;
230
231         ZERO_STRUCT(msg);
232         ZERO_STRUCT(control_un);
233
234         msg.msg_control = control_un.control;
235         msg.msg_controllen = sizeof(control_un.control);
236
237         cmptr = CMSG_FIRSTHDR(&msg);
238         cmptr->cmsg_len = CMSG_LEN(sizeof(int));
239         cmptr->cmsg_level = SOL_SOCKET;
240         cmptr->cmsg_type = SCM_RIGHTS;
241         memcpy(CMSG_DATA(cmptr), &sendfd, sizeof(sendfd));
242 #else
243         ZERO_STRUCT(msg);
244         msg.msg_accrights = (caddr_t) &sendfd;
245         msg.msg_accrightslen = sizeof(int);
246 #endif
247
248         msg.msg_name = NULL;
249         msg.msg_namelen = 0;
250
251         ZERO_STRUCT(iov);
252         iov[0].iov_base = (void *)ptr;
253         iov[0].iov_len = nbytes;
254         msg.msg_iov = iov;
255         msg.msg_iovlen = 1;
256
257         return (sendmsg(fd, &msg, 0));
258 }
259
260 static void aio_child_cleanup(struct tevent_context *event_ctx,
261                               struct tevent_timer *te,
262                               struct timeval now,
263                               void *private_data)
264 {
265         struct aio_child_list *list = talloc_get_type_abort(
266                 private_data, struct aio_child_list);
267         struct aio_child *child, *next;
268
269         TALLOC_FREE(list->cleanup_event);
270
271         for (child = list->children; child != NULL; child = next) {
272                 next = child->next;
273
274                 if (child->busy) {
275                         DEBUG(10, ("child %d currently active\n",
276                                    (int)child->pid));
277                         continue;
278                 }
279
280                 if (child->dont_delete) {
281                         DEBUG(10, ("Child %d was active since last cleanup\n",
282                                    (int)child->pid));
283                         child->dont_delete = false;
284                         continue;
285                 }
286
287                 DEBUG(10, ("Child %d idle for more than 30 seconds, "
288                            "deleting\n", (int)child->pid));
289
290                 TALLOC_FREE(child);
291                 child = next;
292         }
293
294         if (list->children != NULL) {
295                 /*
296                  * Re-schedule the next cleanup round
297                  */
298                 list->cleanup_event = tevent_add_timer(server_event_context(), list,
299                                                       timeval_add(&now, 30, 0),
300                                                       aio_child_cleanup, list);
301
302         }
303 }
304
305 static struct aio_child_list *init_aio_children(struct vfs_handle_struct *handle)
306 {
307         struct aio_child_list *data = NULL;
308
309         if (SMB_VFS_HANDLE_TEST_DATA(handle)) {
310                 SMB_VFS_HANDLE_GET_DATA(handle, data, struct aio_child_list,
311                                         return NULL);
312         }
313
314         if (data == NULL) {
315                 data = talloc_zero(NULL, struct aio_child_list);
316                 if (data == NULL) {
317                         return NULL;
318                 }
319         }
320
321         /*
322          * Regardless of whether the child_list had been around or not, make
323          * sure that we have a cleanup timed event. This timed event will
324          * delete itself when it finds that no children are around anymore.
325          */
326
327         if (data->cleanup_event == NULL) {
328                 data->cleanup_event = tevent_add_timer(server_event_context(), data,
329                                                       timeval_current_ofs(30, 0),
330                                                       aio_child_cleanup, data);
331                 if (data->cleanup_event == NULL) {
332                         TALLOC_FREE(data);
333                         return NULL;
334                 }
335         }
336
337         if (!SMB_VFS_HANDLE_TEST_DATA(handle)) {
338                 SMB_VFS_HANDLE_SET_DATA(handle, data, free_aio_children,
339                                         struct aio_child_list, return False);
340         }
341
342         return data;
343 }
344
345 static void aio_child_loop(int sockfd, struct mmap_area *map)
346 {
347         while (true) {
348                 int fd = -1;
349                 ssize_t ret;
350                 struct rw_cmd cmd_struct;
351                 struct rw_ret ret_struct;
352
353                 ret = read_fd(sockfd, &cmd_struct, sizeof(cmd_struct), &fd);
354                 if (ret != sizeof(cmd_struct)) {
355                         DEBUG(10, ("read_fd returned %d: %s\n", (int)ret,
356                                    strerror(errno)));
357                         exit(1);
358                 }
359
360                 DEBUG(10, ("aio_child_loop: %s %d bytes at %d from fd %d\n",
361                            cmd_type_str(cmd_struct.cmd),
362                            (int)cmd_struct.n, (int)cmd_struct.offset, fd));
363
364                 if (cmd_struct.erratic_testing_mode) {
365                         /*
366                          * For developer testing, we want erratic behaviour for
367                          * async I/O times
368                          */
369                         uint8_t randval;
370                         unsigned msecs;
371                         /*
372                          * use generate_random_buffer, we just forked from a
373                          * common parent state
374                          */
375                         generate_random_buffer(&randval, sizeof(randval));
376                         msecs = randval + 20;
377                         DEBUG(10, ("delaying for %u msecs\n", msecs));
378                         smb_msleep(msecs);
379                 }
380
381                 ZERO_STRUCT(ret_struct);
382
383                 switch (cmd_struct.cmd) {
384                 case READ_CMD:
385                         ret_struct.size = sys_pread(
386                                 fd, (void *)map->ptr, cmd_struct.n,
387                                 cmd_struct.offset);
388 #if 0
389 /* This breaks "make test" when run with aio_fork module. */
390 #ifdef DEVELOPER
391                         ret_struct.size = MAX(1, ret_struct.size * 0.9);
392 #endif
393 #endif
394                         break;
395                 case WRITE_CMD:
396                         ret_struct.size = sys_pwrite(
397                                 fd, (void *)map->ptr, cmd_struct.n,
398                                 cmd_struct.offset);
399                         break;
400                 case FSYNC_CMD:
401                         ret_struct.size = fsync(fd);
402                         break;
403                 default:
404                         ret_struct.size = -1;
405                         errno = EINVAL;
406                 }
407
408                 DEBUG(10, ("aio_child_loop: syscall returned %d\n",
409                            (int)ret_struct.size));
410
411                 if (ret_struct.size == -1) {
412                         ret_struct.ret_errno = errno;
413                 }
414
415                 /*
416                  * Close the fd before telling our parent we're done. The
417                  * parent might close and re-open the file very quickly, and
418                  * with system-level share modes (GPFS) we would get an
419                  * unjustified SHARING_VIOLATION.
420                  */
421                 close(fd);
422
423                 ret = write_data(sockfd, (char *)&ret_struct,
424                                  sizeof(ret_struct));
425                 if (ret != sizeof(ret_struct)) {
426                         DEBUG(10, ("could not write ret_struct: %s\n",
427                                    strerror(errno)));
428                         exit(2);
429                 }
430         }
431 }
432
433 static int aio_child_destructor(struct aio_child *child)
434 {
435         char c=0;
436
437         SMB_ASSERT(!child->busy);
438
439         DEBUG(10, ("aio_child_destructor: removing child %d on fd %d\n",
440                         child->pid, child->sockfd));
441
442         /*
443          * closing the sockfd makes the child not return from recvmsg() on RHEL
444          * 5.5 so instead force the child to exit by writing bad data to it
445          */
446         write(child->sockfd, &c, sizeof(c));
447         close(child->sockfd);
448         DLIST_REMOVE(child->list->children, child);
449         return 0;
450 }
451
452 /*
453  * We have to close all fd's in open files, we might incorrectly hold a system
454  * level share mode on a file.
455  */
456
457 static struct files_struct *close_fsp_fd(struct files_struct *fsp,
458                                          void *private_data)
459 {
460         if ((fsp->fh != NULL) && (fsp->fh->fd != -1)) {
461                 close(fsp->fh->fd);
462                 fsp->fh->fd = -1;
463         }
464         return NULL;
465 }
466
467 static int create_aio_child(struct smbd_server_connection *sconn,
468                             struct aio_child_list *children,
469                             size_t map_size,
470                             struct aio_child **presult)
471 {
472         struct aio_child *result;
473         int fdpair[2];
474         int ret;
475
476         fdpair[0] = fdpair[1] = -1;
477
478         result = talloc_zero(children, struct aio_child);
479         if (result == NULL) {
480                 return ENOMEM;
481         }
482
483         if (socketpair(AF_UNIX, SOCK_STREAM, 0, fdpair) == -1) {
484                 ret = errno;
485                 DEBUG(10, ("socketpair() failed: %s\n", strerror(errno)));
486                 goto fail;
487         }
488
489         DEBUG(10, ("fdpair = %d/%d\n", fdpair[0], fdpair[1]));
490
491         result->map = mmap_area_init(result, map_size);
492         if (result->map == NULL) {
493                 ret = errno;
494                 DEBUG(0, ("Could not create mmap area\n"));
495                 goto fail;
496         }
497
498         result->pid = fork();
499         if (result->pid == -1) {
500                 ret = errno;
501                 DEBUG(0, ("fork failed: %s\n", strerror(errno)));
502                 goto fail;
503         }
504
505         if (result->pid == 0) {
506                 close(fdpair[0]);
507                 result->sockfd = fdpair[1];
508                 files_forall(sconn, close_fsp_fd, NULL);
509                 aio_child_loop(result->sockfd, result->map);
510         }
511
512         DEBUG(10, ("Child %d created with sockfd %d\n",
513                         result->pid, fdpair[0]));
514
515         result->sockfd = fdpair[0];
516         close(fdpair[1]);
517
518         result->list = children;
519         DLIST_ADD(children->children, result);
520
521         talloc_set_destructor(result, aio_child_destructor);
522
523         *presult = result;
524
525         return 0;
526
527  fail:
528         if (fdpair[0] != -1) close(fdpair[0]);
529         if (fdpair[1] != -1) close(fdpair[1]);
530         TALLOC_FREE(result);
531
532         return ret;
533 }
534
535 static int get_idle_child(struct vfs_handle_struct *handle,
536                           struct aio_child **pchild)
537 {
538         struct aio_child_list *children;
539         struct aio_child *child;
540
541         children = init_aio_children(handle);
542         if (children == NULL) {
543                 return ENOMEM;
544         }
545
546         for (child = children->children; child != NULL; child = child->next) {
547                 if (!child->busy) {
548                         break;
549                 }
550         }
551
552         if (child == NULL) {
553                 int ret;
554
555                 DEBUG(10, ("no idle child found, creating new one\n"));
556
557                 ret = create_aio_child(handle->conn->sconn, children,
558                                           128*1024, &child);
559                 if (ret != 0) {
560                         DEBUG(10, ("create_aio_child failed: %s\n",
561                                    strerror(errno)));
562                         return ret;
563                 }
564         }
565
566         child->dont_delete = true;
567         child->busy = true;
568
569         *pchild = child;
570         return 0;
571 }
572
573 struct aio_fork_pread_state {
574         struct aio_child *child;
575         ssize_t ret;
576         int err;
577 };
578
579 static void aio_fork_pread_done(struct tevent_req *subreq);
580
581 static struct tevent_req *aio_fork_pread_send(struct vfs_handle_struct *handle,
582                                               TALLOC_CTX *mem_ctx,
583                                               struct tevent_context *ev,
584                                               struct files_struct *fsp,
585                                               void *data,
586                                               size_t n, off_t offset)
587 {
588         struct tevent_req *req, *subreq;
589         struct aio_fork_pread_state *state;
590         struct rw_cmd cmd;
591         ssize_t written;
592         int err;
593         struct aio_fork_config *config;
594
595         SMB_VFS_HANDLE_GET_DATA(handle, config,
596                                 struct aio_fork_config,
597                                 return NULL);
598
599         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pread_state);
600         if (req == NULL) {
601                 return NULL;
602         }
603
604         if (n > 128*1024) {
605                 /* TODO: support variable buffers */
606                 tevent_req_error(req, EINVAL);
607                 return tevent_req_post(req, ev);
608         }
609
610         err = get_idle_child(handle, &state->child);
611         if (err != 0) {
612                 tevent_req_error(req, err);
613                 return tevent_req_post(req, ev);
614         }
615
616         ZERO_STRUCT(cmd);
617         cmd.n = n;
618         cmd.offset = offset;
619         cmd.cmd = READ_CMD;
620         cmd.erratic_testing_mode = config->erratic_testing_mode;
621
622         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
623                    (int)state->child->pid));
624
625         /*
626          * Not making this async. We're writing into an empty unix
627          * domain socket. This should never block.
628          */
629         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
630                            fsp->fh->fd);
631         if (written == -1) {
632                 err = errno;
633
634                 TALLOC_FREE(state->child);
635
636                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
637                 tevent_req_error(req, err);
638                 return tevent_req_post(req, ev);
639         }
640
641         subreq = read_packet_send(state, ev, state->child->sockfd,
642                                   sizeof(struct rw_ret), NULL, NULL);
643         if (tevent_req_nomem(subreq, req)) {
644                 TALLOC_FREE(state->child); /* we sent sth down */
645                 return tevent_req_post(req, ev);
646         }
647         tevent_req_set_callback(subreq, aio_fork_pread_done, req);
648         return req;
649 }
650
651 static void aio_fork_pread_done(struct tevent_req *subreq)
652 {
653         struct tevent_req *req = tevent_req_callback_data(
654                 subreq, struct tevent_req);
655         struct aio_fork_pread_state *state = tevent_req_data(
656                 req, struct aio_fork_pread_state);
657         ssize_t nread;
658         uint8_t *buf;
659         int err;
660         struct rw_ret *retbuf;
661
662         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
663         TALLOC_FREE(subreq);
664         if (nread == -1) {
665                 TALLOC_FREE(state->child);
666                 tevent_req_error(req, err);
667                 return;
668         }
669
670         state->child->busy = false;
671
672         retbuf = (struct rw_ret *)buf;
673         state->ret = retbuf->size;
674         state->err = retbuf->ret_errno;
675         tevent_req_done(req);
676 }
677
678 static ssize_t aio_fork_pread_recv(struct tevent_req *req, int *err)
679 {
680         struct aio_fork_pread_state *state = tevent_req_data(
681                 req, struct aio_fork_pread_state);
682
683         if (tevent_req_is_unix_error(req, err)) {
684                 return -1;
685         }
686         if (state->ret == -1) {
687                 *err = state->err;
688         }
689         return state->ret;
690 }
691
692 struct aio_fork_pwrite_state {
693         struct aio_child *child;
694         ssize_t ret;
695         int err;
696 };
697
698 static void aio_fork_pwrite_done(struct tevent_req *subreq);
699
700 static struct tevent_req *aio_fork_pwrite_send(
701         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
702         struct tevent_context *ev, struct files_struct *fsp,
703         const void *data, size_t n, off_t offset)
704 {
705         struct tevent_req *req, *subreq;
706         struct aio_fork_pwrite_state *state;
707         struct rw_cmd cmd;
708         ssize_t written;
709         int err;
710         struct aio_fork_config *config;
711         SMB_VFS_HANDLE_GET_DATA(handle, config,
712                                 struct aio_fork_config,
713                                 return NULL);
714
715         req = tevent_req_create(mem_ctx, &state, struct aio_fork_pwrite_state);
716         if (req == NULL) {
717                 return NULL;
718         }
719
720         if (n > 128*1024) {
721                 /* TODO: support variable buffers */
722                 tevent_req_error(req, EINVAL);
723                 return tevent_req_post(req, ev);
724         }
725
726         err = get_idle_child(handle, &state->child);
727         if (err != 0) {
728                 tevent_req_error(req, err);
729                 return tevent_req_post(req, ev);
730         }
731
732         ZERO_STRUCT(cmd);
733         cmd.n = n;
734         cmd.offset = offset;
735         cmd.cmd = WRITE_CMD;
736         cmd.erratic_testing_mode = config->erratic_testing_mode;
737
738         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
739                    (int)state->child->pid));
740
741         /*
742          * Not making this async. We're writing into an empty unix
743          * domain socket. This should never block.
744          */
745         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
746                            fsp->fh->fd);
747         if (written == -1) {
748                 err = errno;
749
750                 TALLOC_FREE(state->child);
751
752                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
753                 tevent_req_error(req, err);
754                 return tevent_req_post(req, ev);
755         }
756
757         subreq = read_packet_send(state, ev, state->child->sockfd,
758                                   sizeof(struct rw_ret), NULL, NULL);
759         if (tevent_req_nomem(subreq, req)) {
760                 TALLOC_FREE(state->child); /* we sent sth down */
761                 return tevent_req_post(req, ev);
762         }
763         tevent_req_set_callback(subreq, aio_fork_pwrite_done, req);
764         return req;
765 }
766
767 static void aio_fork_pwrite_done(struct tevent_req *subreq)
768 {
769         struct tevent_req *req = tevent_req_callback_data(
770                 subreq, struct tevent_req);
771         struct aio_fork_pwrite_state *state = tevent_req_data(
772                 req, struct aio_fork_pwrite_state);
773         ssize_t nread;
774         uint8_t *buf;
775         int err;
776         struct rw_ret *retbuf;
777
778         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
779         TALLOC_FREE(subreq);
780         if (nread == -1) {
781                 TALLOC_FREE(state->child);
782                 tevent_req_error(req, err);
783                 return;
784         }
785
786         state->child->busy = false;
787
788         retbuf = (struct rw_ret *)buf;
789         state->ret = retbuf->size;
790         state->err = retbuf->ret_errno;
791         tevent_req_done(req);
792 }
793
794 static ssize_t aio_fork_pwrite_recv(struct tevent_req *req, int *err)
795 {
796         struct aio_fork_pwrite_state *state = tevent_req_data(
797                 req, struct aio_fork_pwrite_state);
798
799         if (tevent_req_is_unix_error(req, err)) {
800                 return -1;
801         }
802         if (state->ret == -1) {
803                 *err = state->err;
804         }
805         return state->ret;
806 }
807
808 struct aio_fork_fsync_state {
809         struct aio_child *child;
810         ssize_t ret;
811         int err;
812 };
813
814 static void aio_fork_fsync_done(struct tevent_req *subreq);
815
816 static struct tevent_req *aio_fork_fsync_send(
817         struct vfs_handle_struct *handle, TALLOC_CTX *mem_ctx,
818         struct tevent_context *ev, struct files_struct *fsp)
819 {
820         struct tevent_req *req, *subreq;
821         struct aio_fork_fsync_state *state;
822         struct rw_cmd cmd;
823         ssize_t written;
824         int err;
825         struct aio_fork_config *config;
826
827         SMB_VFS_HANDLE_GET_DATA(handle, config,
828                                 struct aio_fork_config,
829                                 return NULL);
830
831         req = tevent_req_create(mem_ctx, &state, struct aio_fork_fsync_state);
832         if (req == NULL) {
833                 return NULL;
834         }
835
836         err = get_idle_child(handle, &state->child);
837         if (err != 0) {
838                 tevent_req_error(req, err);
839                 return tevent_req_post(req, ev);
840         }
841
842         ZERO_STRUCT(cmd);
843         cmd.cmd = FSYNC_CMD;
844         cmd.erratic_testing_mode = config->erratic_testing_mode;
845
846         DEBUG(10, ("sending fd %d to child %d\n", fsp->fh->fd,
847                    (int)state->child->pid));
848
849         /*
850          * Not making this async. We're writing into an empty unix
851          * domain socket. This should never block.
852          */
853         written = write_fd(state->child->sockfd, &cmd, sizeof(cmd),
854                            fsp->fh->fd);
855         if (written == -1) {
856                 err = errno;
857
858                 TALLOC_FREE(state->child);
859
860                 DEBUG(10, ("write_fd failed: %s\n", strerror(err)));
861                 tevent_req_error(req, err);
862                 return tevent_req_post(req, ev);
863         }
864
865         subreq = read_packet_send(state, ev, state->child->sockfd,
866                                   sizeof(struct rw_ret), NULL, NULL);
867         if (tevent_req_nomem(subreq, req)) {
868                 TALLOC_FREE(state->child); /* we sent sth down */
869                 return tevent_req_post(req, ev);
870         }
871         tevent_req_set_callback(subreq, aio_fork_fsync_done, req);
872         return req;
873 }
874
875 static void aio_fork_fsync_done(struct tevent_req *subreq)
876 {
877         struct tevent_req *req = tevent_req_callback_data(
878                 subreq, struct tevent_req);
879         struct aio_fork_fsync_state *state = tevent_req_data(
880                 req, struct aio_fork_fsync_state);
881         ssize_t nread;
882         uint8_t *buf;
883         int err;
884         struct rw_ret *retbuf;
885
886         nread = read_packet_recv(subreq, talloc_tos(), &buf, &err);
887         TALLOC_FREE(subreq);
888         if (nread == -1) {
889                 TALLOC_FREE(state->child);
890                 tevent_req_error(req, err);
891                 return;
892         }
893
894         state->child->busy = false;
895
896         retbuf = (struct rw_ret *)buf;
897         state->ret = retbuf->size;
898         state->err = retbuf->ret_errno;
899         tevent_req_done(req);
900 }
901
902 static int aio_fork_fsync_recv(struct tevent_req *req, int *err)
903 {
904         struct aio_fork_fsync_state *state = tevent_req_data(
905                 req, struct aio_fork_fsync_state);
906
907         if (tevent_req_is_unix_error(req, err)) {
908                 return -1;
909         }
910         if (state->ret == -1) {
911                 *err = state->err;
912         }
913         return state->ret;
914 }
915
916 static int aio_fork_connect(vfs_handle_struct *handle, const char *service,
917                             const char *user)
918 {
919         int ret;
920         struct aio_fork_config *config;
921         ret = SMB_VFS_NEXT_CONNECT(handle, service, user);
922
923         if (ret < 0) {
924                 return ret;
925         }
926
927         config = talloc_zero(handle->conn, struct aio_fork_config);
928         if (!config) {
929                 SMB_VFS_NEXT_DISCONNECT(handle);
930                 DEBUG(0, ("talloc_zero() failed\n"));
931                 return -1;
932         }
933
934         config->erratic_testing_mode = lp_parm_bool(SNUM(handle->conn), "vfs_aio_fork",
935                                                     "erratic_testing_mode", false);
936         
937         SMB_VFS_HANDLE_SET_DATA(handle, config,
938                                 NULL, struct aio_fork_config,
939                                 return -1);
940
941         /*********************************************************************
942          * How many threads to initialize ?
943          * 100 per process seems insane as a default until you realize that
944          * (a) Threads terminate after 1 second when idle.
945          * (b) Throttling is done in SMB2 via the crediting algorithm.
946          * (c) SMB1 clients are limited to max_mux (50) outstanding
947          *     requests and Windows clients don't use this anyway.
948          * Essentially we want this to be unlimited unless smb.conf
949          * says different.
950          *********************************************************************/
951         aio_pending_size = 100;
952         return 0;
953 }
954
955 static struct vfs_fn_pointers vfs_aio_fork_fns = {
956         .connect_fn = aio_fork_connect,
957         .pread_send_fn = aio_fork_pread_send,
958         .pread_recv_fn = aio_fork_pread_recv,
959         .pwrite_send_fn = aio_fork_pwrite_send,
960         .pwrite_recv_fn = aio_fork_pwrite_recv,
961         .fsync_send_fn = aio_fork_fsync_send,
962         .fsync_recv_fn = aio_fork_fsync_recv,
963 };
964
965 NTSTATUS vfs_aio_fork_init(void);
966 NTSTATUS vfs_aio_fork_init(void)
967 {
968         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
969                                 "aio_fork", &vfs_aio_fork_fns);
970 }