s3: Avoid a call to server_event_context()
[metze/samba/wip.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "smbd/globals.h"
29 #include "lib/pthreadpool/pthreadpool.h"
30
31 struct aio_extra;
32 static struct pthreadpool *pool;
33 static int aio_pthread_jobid;
34
35 struct aio_private_data {
36         struct aio_private_data *prev, *next;
37         int jobid;
38         SMB_STRUCT_AIOCB *aiocb;
39         ssize_t ret_size;
40         int ret_errno;
41         bool cancelled;
42         bool write_command;
43 };
44
45 /* List of outstanding requests we have. */
46 static struct aio_private_data *pd_list;
47
48 static void aio_pthread_handle_completion(struct event_context *event_ctx,
49                                 struct fd_event *event,
50                                 uint16 flags,
51                                 void *p);
52
53
54 /************************************************************************
55  Ensure thread pool is initialized.
56 ***********************************************************************/
57
58 static bool init_aio_threadpool(struct vfs_handle_struct *handle)
59 {
60         struct fd_event *sock_event = NULL;
61         int ret = 0;
62
63         if (pool) {
64                 return true;
65         }
66
67         ret = pthreadpool_init(aio_pending_size, &pool);
68         if (ret) {
69                 errno = ret;
70                 return false;
71         }
72         sock_event = tevent_add_fd(handle->conn->sconn->ev_ctx,
73                                 NULL,
74                                 pthreadpool_signal_fd(pool),
75                                 TEVENT_FD_READ,
76                                 aio_pthread_handle_completion,
77                                 NULL);
78         if (sock_event == NULL) {
79                 pthreadpool_destroy(pool);
80                 pool = NULL;
81                 return false;
82         }
83
84         DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
85                   aio_pending_size));
86
87         return true;
88 }
89
90
91 /************************************************************************
92  Worker function - core of the pthread aio engine.
93  This is the function that actually does the IO.
94 ***********************************************************************/
95
96 static void aio_worker(void *private_data)
97 {
98         struct aio_private_data *pd =
99                         (struct aio_private_data *)private_data;
100
101         if (pd->write_command) {
102                 pd->ret_size = sys_pwrite(pd->aiocb->aio_fildes,
103                                 (const void *)pd->aiocb->aio_buf,
104                                 pd->aiocb->aio_nbytes,
105                                 pd->aiocb->aio_offset);
106                 if (pd->ret_size == -1 && errno == ESPIPE) {
107                         /* Maintain the fiction that pipes can
108                            be seeked (sought?) on. */
109                         pd->ret_size = sys_write(pd->aiocb->aio_fildes,
110                                         (const void *)pd->aiocb->aio_buf,
111                                         pd->aiocb->aio_nbytes);
112                 }
113         } else {
114                 pd->ret_size = sys_pread(pd->aiocb->aio_fildes,
115                                 (void *)pd->aiocb->aio_buf,
116                                 pd->aiocb->aio_nbytes,
117                                 pd->aiocb->aio_offset);
118                 if (pd->ret_size == -1 && errno == ESPIPE) {
119                         /* Maintain the fiction that pipes can
120                            be seeked (sought?) on. */
121                         pd->ret_size = sys_read(pd->aiocb->aio_fildes,
122                                         (void *)pd->aiocb->aio_buf,
123                                         pd->aiocb->aio_nbytes);
124                 }
125         }
126         if (pd->ret_size == -1) {
127                 pd->ret_errno = errno;
128         } else {
129                 pd->ret_errno = 0;
130         }
131 }
132
133 /************************************************************************
134  Private data destructor.
135 ***********************************************************************/
136
137 static int pd_destructor(struct aio_private_data *pd)
138 {
139         DLIST_REMOVE(pd_list, pd);
140         return 0;
141 }
142
143 /************************************************************************
144  Create and initialize a private data struct.
145 ***********************************************************************/
146
147 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
148                                         SMB_STRUCT_AIOCB *aiocb)
149 {
150         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
151         if (!pd) {
152                 return NULL;
153         }
154         pd->jobid = aio_pthread_jobid++;
155         pd->aiocb = aiocb;
156         pd->ret_size = -1;
157         pd->ret_errno = EINPROGRESS;
158         talloc_set_destructor(pd, pd_destructor);
159         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
160         return pd;
161 }
162
163 /************************************************************************
164  Spin off a threadpool (if needed) and initiate a pread call.
165 ***********************************************************************/
166
167 static int aio_pthread_read(struct vfs_handle_struct *handle,
168                                 struct files_struct *fsp,
169                                 SMB_STRUCT_AIOCB *aiocb)
170 {
171         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
172         struct aio_private_data *pd = NULL;
173         int ret;
174
175         if (!init_aio_threadpool(handle)) {
176                 return -1;
177         }
178
179         pd = create_private_data(aio_ex, aiocb);
180         if (pd == NULL) {
181                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
182                 return -1;
183         }
184
185         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
186         if (ret) {
187                 errno = ret;
188                 return -1;
189         }
190
191         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
192                 "of %llu bytes at offset %llu\n",
193                 pd->jobid,
194                 (unsigned long long)pd->aiocb->aio_nbytes,
195                 (unsigned long long)pd->aiocb->aio_offset));
196
197         return 0;
198 }
199
200 /************************************************************************
201  Spin off a threadpool (if needed) and initiate a pwrite call.
202 ***********************************************************************/
203
204 static int aio_pthread_write(struct vfs_handle_struct *handle,
205                                 struct files_struct *fsp,
206                                 SMB_STRUCT_AIOCB *aiocb)
207 {
208         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
209         struct aio_private_data *pd = NULL;
210         int ret;
211
212         if (!init_aio_threadpool(handle)) {
213                 return -1;
214         }
215
216         pd = create_private_data(aio_ex, aiocb);
217         if (pd == NULL) {
218                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
219                 return -1;
220         }
221
222         pd->write_command = true;
223
224         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
225         if (ret) {
226                 errno = ret;
227                 return -1;
228         }
229
230         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
231                 "of %llu bytes at offset %llu\n",
232                 pd->jobid,
233                 (unsigned long long)pd->aiocb->aio_nbytes,
234                 (unsigned long long)pd->aiocb->aio_offset));
235
236         return 0;
237 }
238
239 /************************************************************************
240  Find the private data by jobid.
241 ***********************************************************************/
242
243 static struct aio_private_data *find_private_data_by_jobid(int jobid)
244 {
245         struct aio_private_data *pd;
246
247         for (pd = pd_list; pd != NULL; pd = pd->next) {
248                 if (pd->jobid == jobid) {
249                         return pd;
250                 }
251         }
252
253         return NULL;
254 }
255
256 /************************************************************************
257  Callback when an IO completes.
258 ***********************************************************************/
259
260 static void aio_pthread_handle_completion(struct event_context *event_ctx,
261                                 struct fd_event *event,
262                                 uint16 flags,
263                                 void *p)
264 {
265         struct aio_extra *aio_ex = NULL;
266         struct aio_private_data *pd = NULL;
267         int jobid = 0;
268         int ret;
269
270         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
271                         (int)flags));
272
273         if ((flags & EVENT_FD_READ) == 0) {
274                 return;
275         }
276
277         ret = pthreadpool_finished_job(pool, &jobid);
278         if (ret) {
279                 smb_panic("aio_pthread_handle_completion");
280                 return;
281         }
282
283         pd = find_private_data_by_jobid(jobid);
284         if (pd == NULL) {
285                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
286                           jobid));
287                 return;
288         }
289
290         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
291         smbd_aio_complete_aio_ex(aio_ex);
292
293         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
294                 jobid ));
295         TALLOC_FREE(aio_ex);
296 }
297
298 /************************************************************************
299  Find the private data by aiocb.
300 ***********************************************************************/
301
302 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
303 {
304         struct aio_private_data *pd;
305
306         for (pd = pd_list; pd != NULL; pd = pd->next) {
307                 if (pd->aiocb == aiocb) {
308                         return pd;
309                 }
310         }
311
312         return NULL;
313 }
314
315 /************************************************************************
316  Called to return the result of a completed AIO.
317  Should only be called if aio_error returns something other than EINPROGRESS.
318  Returns:
319         Any other value - return from IO operation.
320 ***********************************************************************/
321
322 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
323                                 struct files_struct *fsp,
324                                 SMB_STRUCT_AIOCB *aiocb)
325 {
326         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
327
328         if (pd == NULL) {
329                 errno = EINVAL;
330                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
331                 return -1;
332         }
333
334         pd->aiocb = NULL;
335
336         if (pd->cancelled) {
337                 errno = ECANCELED;
338                 return -1;
339         }
340
341         if (pd->ret_size == -1) {
342                 errno = pd->ret_errno;
343         }
344
345         return pd->ret_size;
346 }
347
348 /************************************************************************
349  Called to check the result of an AIO.
350  Returns:
351         EINPROGRESS - still in progress.
352         EINVAL - invalid aiocb.
353         ECANCELED - request was cancelled.
354         0 - request completed successfully.
355         Any other value - errno from IO operation.
356 ***********************************************************************/
357
358 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
359                              struct files_struct *fsp,
360                              SMB_STRUCT_AIOCB *aiocb)
361 {
362         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
363
364         if (pd == NULL) {
365                 return EINVAL;
366         }
367         if (pd->cancelled) {
368                 return ECANCELED;
369         }
370         return pd->ret_errno;
371 }
372
373 /************************************************************************
374  Called to request the cancel of an AIO, or all of them on a specific
375  fsp if aiocb == NULL.
376 ***********************************************************************/
377
378 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
379                         struct files_struct *fsp,
380                         SMB_STRUCT_AIOCB *aiocb)
381 {
382         struct aio_private_data *pd = NULL;
383
384         for (pd = pd_list; pd != NULL; pd = pd->next) {
385                 if (pd->aiocb == NULL) {
386                         continue;
387                 }
388                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
389                         continue;
390                 }
391                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
392                         continue;
393                 }
394
395                 /*
396                  * We let the child do its job, but we discard the result when
397                  * it's finished.
398                  */
399
400                 pd->cancelled = true;
401         }
402
403         return AIO_CANCELED;
404 }
405
406 /************************************************************************
407  Callback for a previously detected job completion.
408 ***********************************************************************/
409
410 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
411                                 struct tevent_immediate *im,
412                                 void *private_data)
413 {
414         struct aio_extra *aio_ex = NULL;
415         struct aio_private_data *pd = (struct aio_private_data *)private_data;
416
417         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
418         smbd_aio_complete_aio_ex(aio_ex);
419         TALLOC_FREE(aio_ex);
420 }
421
422 /************************************************************************
423  Private data struct used in suspend completion code.
424 ***********************************************************************/
425
426 struct suspend_private {
427         int num_entries;
428         int num_finished;
429         const SMB_STRUCT_AIOCB * const *aiocb_array;
430 };
431
432 /************************************************************************
433  Callback when an IO completes from a suspend call.
434 ***********************************************************************/
435
436 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
437                                 struct fd_event *event,
438                                 uint16 flags,
439                                 void *p)
440 {
441         struct suspend_private *sp = (struct suspend_private *)p;
442         struct aio_private_data *pd = NULL;
443         struct tevent_immediate *im = NULL;
444         int jobid;
445         int i;
446
447         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
448                         (int)flags));
449
450         if ((flags & EVENT_FD_READ) == 0) {
451                 return;
452         }
453
454         if (pthreadpool_finished_job(pool, &jobid)) {
455                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
456                 return;
457         }
458
459         pd = find_private_data_by_jobid(jobid);
460         if (pd == NULL) {
461                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
462                           jobid));
463                 return;
464         }
465
466         /* Is this a jobid with an aiocb we're interested in ? */
467         for (i = 0; i < sp->num_entries; i++) {
468                 if (sp->aiocb_array[i] == pd->aiocb) {
469                         sp->num_finished++;
470                         return;
471                 }
472         }
473
474         /* Jobid completed we weren't waiting for.
475            We must reshedule this as an immediate event
476            on the main event context. */
477         im = tevent_create_immediate(NULL);
478         if (!im) {
479                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
480         }
481
482         DEBUG(10,("aio_pthread_handle_suspend_completion: "
483                         "re-scheduling job id %d\n",
484                         jobid));
485
486         tevent_schedule_immediate(im,
487                         server_event_context(),
488                         aio_pthread_handle_immediate,
489                         (void *)pd);
490 }
491
492
493 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
494                                         struct tevent_timer *te,
495                                         struct timeval now,
496                                         void *private_data)
497 {
498         bool *timed_out = (bool *)private_data;
499         /* Remove this timed event handler. */
500         TALLOC_FREE(te);
501         *timed_out = true;
502 }
503
504 /************************************************************************
505  Called to request everything to stop until all IO is completed.
506 ***********************************************************************/
507
508 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
509                         struct files_struct *fsp,
510                         const SMB_STRUCT_AIOCB * const aiocb_array[],
511                         int n,
512                         const struct timespec *timeout)
513 {
514         struct event_context *ev = NULL;
515         struct fd_event *sock_event = NULL;
516         int ret = -1;
517         struct suspend_private sp;
518         bool timed_out = false;
519         TALLOC_CTX *frame = talloc_stackframe();
520
521         /* This is a blocking call, and has to use a sub-event loop. */
522         ev = event_context_init(frame);
523         if (ev == NULL) {
524                 errno = ENOMEM;
525                 goto out;
526         }
527
528         if (timeout) {
529                 struct timeval tv = convert_timespec_to_timeval(*timeout);
530                 struct tevent_timer *te = tevent_add_timer(ev,
531                                                 frame,
532                                                 timeval_current_ofs(tv.tv_sec,
533                                                                     tv.tv_usec),
534                                                 aio_pthread_suspend_timed_out,
535                                                 &timed_out);
536                 if (!te) {
537                         errno = ENOMEM;
538                         goto out;
539                 }
540         }
541
542         ZERO_STRUCT(sp);
543         sp.num_entries = n;
544         sp.aiocb_array = aiocb_array;
545         sp.num_finished = 0;
546
547         sock_event = tevent_add_fd(ev,
548                                 frame,
549                                 pthreadpool_signal_fd(pool),
550                                 TEVENT_FD_READ,
551                                 aio_pthread_handle_suspend_completion,
552                                 (void *)&sp);
553         if (sock_event == NULL) {
554                 pthreadpool_destroy(pool);
555                 pool = NULL;
556                 goto out;
557         }
558         /*
559          * We're going to cheat here. We know that smbd/aio.c
560          * only calls this when it's waiting for every single
561          * outstanding call to finish on a close, so just wait
562          * individually for each IO to complete. We don't care
563          * what order they finish - only that they all do. JRA.
564          */
565         while (sp.num_entries != sp.num_finished) {
566                 if (tevent_loop_once(ev) == -1) {
567                         goto out;
568                 }
569
570                 if (timed_out) {
571                         errno = EAGAIN;
572                         goto out;
573                 }
574         }
575
576         ret = 0;
577
578   out:
579
580         TALLOC_FREE(frame);
581         return ret;
582 }
583
584 static int aio_pthread_connect(vfs_handle_struct *handle, const char *service,
585                                const char *user)
586 {
587         /*********************************************************************
588          * How many threads to initialize ?
589          * 100 per process seems insane as a default until you realize that
590          * (a) Threads terminate after 1 second when idle.
591          * (b) Throttling is done in SMB2 via the crediting algorithm.
592          * (c) SMB1 clients are limited to max_mux (50) outstanding
593          *     requests and Windows clients don't use this anyway.
594          * Essentially we want this to be unlimited unless smb.conf
595          * says different.
596          *********************************************************************/
597         aio_pending_size = lp_parm_int(
598                 SNUM(handle->conn), "aio_pthread", "aio num threads", 100);
599         return SMB_VFS_NEXT_CONNECT(handle, service, user);
600 }
601
602 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
603         .connect_fn = aio_pthread_connect,
604         .aio_read_fn = aio_pthread_read,
605         .aio_write_fn = aio_pthread_write,
606         .aio_return_fn = aio_pthread_return_fn,
607         .aio_cancel_fn = aio_pthread_cancel,
608         .aio_error_fn = aio_pthread_error_fn,
609         .aio_suspend_fn = aio_pthread_suspend,
610 };
611
612 NTSTATUS vfs_aio_pthread_init(void);
613 NTSTATUS vfs_aio_pthread_init(void)
614 {
615         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
616                                 "aio_pthread", &vfs_aio_pthread_fns);
617 }