s3-aio-pthread: num threads should be int
[metze/samba/wip.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "lib/pthreadpool/pthreadpool.h"
29
30 struct aio_extra;
31 static struct pthreadpool *pool;
32 static int aio_pthread_jobid;
33
34 struct aio_private_data {
35         struct aio_private_data *prev, *next;
36         int jobid;
37         SMB_STRUCT_AIOCB *aiocb;
38         ssize_t ret_size;
39         int ret_errno;
40         bool cancelled;
41         bool write_command;
42 };
43
44 /* List of outstanding requests we have. */
45 static struct aio_private_data *pd_list;
46
47 static void aio_pthread_handle_completion(struct event_context *event_ctx,
48                                 struct fd_event *event,
49                                 uint16 flags,
50                                 void *p);
51
52 /************************************************************************
53  How many threads to initialize ?
54  100 per process seems insane as a default until you realize that
55  (a) Threads terminate after 1 second when idle.
56  (b) Throttling is done in SMB2 via the crediting algorithm.
57  (c) SMB1 clients are limited to max_mux (50) outstanding requests and
58      Windows clients don't use this anyway.
59  Essentially we want this to be unlimited unless smb.conf says different.
60 ***********************************************************************/
61
62 static int aio_get_num_threads(struct vfs_handle_struct *handle)
63 {
64         return lp_parm_int(SNUM(handle->conn),
65                            "aio_pthread", "aio num threads", 100);
66 }
67
68 /************************************************************************
69  Ensure thread pool is initialized.
70 ***********************************************************************/
71
72 static bool init_aio_threadpool(struct vfs_handle_struct *handle)
73 {
74         struct fd_event *sock_event = NULL;
75         int ret = 0;
76         int num_threads;
77
78         if (pool) {
79                 return true;
80         }
81
82         num_threads = aio_get_num_threads(handle);
83         ret = pthreadpool_init(num_threads, &pool);
84         if (ret) {
85                 errno = ret;
86                 return false;
87         }
88         sock_event = tevent_add_fd(server_event_context(),
89                                 NULL,
90                                 pthreadpool_signal_fd(pool),
91                                 TEVENT_FD_READ,
92                                 aio_pthread_handle_completion,
93                                 NULL);
94         if (sock_event == NULL) {
95                 pthreadpool_destroy(pool);
96                 pool = NULL;
97                 return false;
98         }
99
100         DEBUG(10,("init_aio_threadpool: initialized with up to %d threads\n",
101                         num_threads));
102
103         return true;
104 }
105
106
107 /************************************************************************
108  Worker function - core of the pthread aio engine.
109  This is the function that actually does the IO.
110 ***********************************************************************/
111
112 static void aio_worker(void *private_data)
113 {
114         struct aio_private_data *pd =
115                         (struct aio_private_data *)private_data;
116
117         if (pd->write_command) {
118                 pd->ret_size = pwrite(pd->aiocb->aio_fildes,
119                                 (const void *)pd->aiocb->aio_buf,
120                                 pd->aiocb->aio_nbytes,
121                                 pd->aiocb->aio_offset);
122         } else {
123                 pd->ret_size = pread(pd->aiocb->aio_fildes,
124                                 (void *)pd->aiocb->aio_buf,
125                                 pd->aiocb->aio_nbytes,
126                                 pd->aiocb->aio_offset);
127         }
128         if (pd->ret_size == -1) {
129                 pd->ret_errno = errno;
130         } else {
131                 pd->ret_errno = 0;
132         }
133 }
134
135 /************************************************************************
136  Private data destructor.
137 ***********************************************************************/
138
139 static int pd_destructor(struct aio_private_data *pd)
140 {
141         DLIST_REMOVE(pd_list, pd);
142         return 0;
143 }
144
145 /************************************************************************
146  Create and initialize a private data struct.
147 ***********************************************************************/
148
149 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
150                                         SMB_STRUCT_AIOCB *aiocb)
151 {
152         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
153         if (!pd) {
154                 return NULL;
155         }
156         pd->jobid = aio_pthread_jobid++;
157         pd->aiocb = aiocb;
158         pd->ret_size = -1;
159         pd->ret_errno = EINPROGRESS;
160         talloc_set_destructor(pd, pd_destructor);
161         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
162         return pd;
163 }
164
165 /************************************************************************
166  Spin off a threadpool (if needed) and initiate a pread call.
167 ***********************************************************************/
168
169 static int aio_pthread_read(struct vfs_handle_struct *handle,
170                                 struct files_struct *fsp,
171                                 SMB_STRUCT_AIOCB *aiocb)
172 {
173         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
174         struct aio_private_data *pd = NULL;
175         int ret;
176
177         if (!init_aio_threadpool(handle)) {
178                 return -1;
179         }
180
181         pd = create_private_data(aio_ex, aiocb);
182         if (pd == NULL) {
183                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
184                 return -1;
185         }
186
187         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
188         if (ret) {
189                 errno = ret;
190                 return -1;
191         }
192
193         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
194                 "of %llu bytes at offset %llu\n",
195                 pd->jobid,
196                 (unsigned long long)pd->aiocb->aio_nbytes,
197                 (unsigned long long)pd->aiocb->aio_offset));
198
199         return 0;
200 }
201
202 /************************************************************************
203  Spin off a threadpool (if needed) and initiate a pwrite call.
204 ***********************************************************************/
205
206 static int aio_pthread_write(struct vfs_handle_struct *handle,
207                                 struct files_struct *fsp,
208                                 SMB_STRUCT_AIOCB *aiocb)
209 {
210         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
211         struct aio_private_data *pd = NULL;
212         int ret;
213
214         if (!init_aio_threadpool(handle)) {
215                 return -1;
216         }
217
218         pd = create_private_data(aio_ex, aiocb);
219         if (pd == NULL) {
220                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
221                 return -1;
222         }
223
224         pd->write_command = true;
225
226         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
227         if (ret) {
228                 errno = ret;
229                 return -1;
230         }
231
232         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
233                 "of %llu bytes at offset %llu\n",
234                 pd->jobid,
235                 (unsigned long long)pd->aiocb->aio_nbytes,
236                 (unsigned long long)pd->aiocb->aio_offset));
237
238         return 0;
239 }
240
241 /************************************************************************
242  Find the private data by jobid.
243 ***********************************************************************/
244
245 static struct aio_private_data *find_private_data_by_jobid(int jobid)
246 {
247         struct aio_private_data *pd;
248
249         for (pd = pd_list; pd != NULL; pd = pd->next) {
250                 if (pd->jobid == jobid) {
251                         return pd;
252                 }
253         }
254
255         return NULL;
256 }
257
258 /************************************************************************
259  Callback when an IO completes.
260 ***********************************************************************/
261
262 static void aio_pthread_handle_completion(struct event_context *event_ctx,
263                                 struct fd_event *event,
264                                 uint16 flags,
265                                 void *p)
266 {
267         struct aio_extra *aio_ex = NULL;
268         struct aio_private_data *pd = NULL;
269         int jobid = 0;
270         int ret;
271
272         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
273                         (int)flags));
274
275         if ((flags & EVENT_FD_READ) == 0) {
276                 return;
277         }
278
279         ret = pthreadpool_finished_job(pool, &jobid);
280         if (ret) {
281                 smb_panic("aio_pthread_handle_completion");
282                 return;
283         }
284
285         pd = find_private_data_by_jobid(jobid);
286         if (pd == NULL) {
287                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
288                           jobid));
289                 return;
290         }
291
292         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
293         smbd_aio_complete_aio_ex(aio_ex);
294
295         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
296                 jobid ));
297         TALLOC_FREE(aio_ex);
298 }
299
300 /************************************************************************
301  Find the private data by aiocb.
302 ***********************************************************************/
303
304 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
305 {
306         struct aio_private_data *pd;
307
308         for (pd = pd_list; pd != NULL; pd = pd->next) {
309                 if (pd->aiocb == aiocb) {
310                         return pd;
311                 }
312         }
313
314         return NULL;
315 }
316
317 /************************************************************************
318  Called to return the result of a completed AIO.
319  Should only be called if aio_error returns something other than EINPROGRESS.
320  Returns:
321         Any other value - return from IO operation.
322 ***********************************************************************/
323
324 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
325                                 struct files_struct *fsp,
326                                 SMB_STRUCT_AIOCB *aiocb)
327 {
328         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
329
330         if (pd == NULL) {
331                 errno = EINVAL;
332                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
333                 return -1;
334         }
335
336         pd->aiocb = NULL;
337
338         if (pd->ret_size == -1) {
339                 errno = pd->ret_errno;
340         }
341
342         return pd->ret_size;
343 }
344
345 /************************************************************************
346  Called to check the result of an AIO.
347  Returns:
348         EINPROGRESS - still in progress.
349         EINVAL - invalid aiocb.
350         ECANCELED - request was cancelled.
351         0 - request completed successfully.
352         Any other value - errno from IO operation.
353 ***********************************************************************/
354
355 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
356                              struct files_struct *fsp,
357                              SMB_STRUCT_AIOCB *aiocb)
358 {
359         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
360
361         if (pd == NULL) {
362                 return EINVAL;
363         }
364         if (pd->cancelled) {
365                 return ECANCELED;
366         }
367         return pd->ret_errno;
368 }
369
370 /************************************************************************
371  Called to request the cancel of an AIO, or all of them on a specific
372  fsp if aiocb == NULL.
373 ***********************************************************************/
374
375 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
376                         struct files_struct *fsp,
377                         SMB_STRUCT_AIOCB *aiocb)
378 {
379         struct aio_private_data *pd = NULL;
380
381         for (pd = pd_list; pd != NULL; pd = pd->next) {
382                 if (pd->aiocb == NULL) {
383                         continue;
384                 }
385                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
386                         continue;
387                 }
388                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
389                         continue;
390                 }
391
392                 /*
393                  * We let the child do its job, but we discard the result when
394                  * it's finished.
395                  */
396
397                 pd->cancelled = true;
398         }
399
400         return AIO_CANCELED;
401 }
402
403 /************************************************************************
404  Callback for a previously detected job completion.
405 ***********************************************************************/
406
407 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
408                                 struct tevent_immediate *im,
409                                 void *private_data)
410 {
411         struct aio_extra *aio_ex = NULL;
412         int *pjobid = (int *)private_data;
413         struct aio_private_data *pd = find_private_data_by_jobid(*pjobid);
414
415         if (pd == NULL) {
416                 DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n",
417                           *pjobid));
418                 TALLOC_FREE(pjobid);
419                 return;
420         }
421
422         TALLOC_FREE(pjobid);
423         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
424         smbd_aio_complete_aio_ex(aio_ex);
425         TALLOC_FREE(aio_ex);
426 }
427
428 /************************************************************************
429  Private data struct used in suspend completion code.
430 ***********************************************************************/
431
432 struct suspend_private {
433         int num_entries;
434         int num_finished;
435         const SMB_STRUCT_AIOCB * const *aiocb_array;
436 };
437
438 /************************************************************************
439  Callback when an IO completes from a suspend call.
440 ***********************************************************************/
441
442 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
443                                 struct fd_event *event,
444                                 uint16 flags,
445                                 void *p)
446 {
447         struct suspend_private *sp = (struct suspend_private *)p;
448         struct aio_private_data *pd = NULL;
449         struct tevent_immediate *im = NULL;
450         int *pjobid = NULL;
451         int i;
452
453         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
454                         (int)flags));
455
456         if ((flags & EVENT_FD_READ) == 0) {
457                 return;
458         }
459
460         pjobid = talloc_array(NULL, int, 1);
461         if (pjobid) {
462                 smb_panic("aio_pthread_handle_suspend_completion: no memory.");
463         }
464
465         if (pthreadpool_finished_job(pool, pjobid)) {
466                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
467                 return;
468         }
469
470         pd = find_private_data_by_jobid(*pjobid);
471         if (pd == NULL) {
472                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
473                           *pjobid));
474                 TALLOC_FREE(pjobid);
475                 return;
476         }
477
478         /* Is this a jobid with an aiocb we're interested in ? */
479         for (i = 0; i < sp->num_entries; i++) {
480                 if (sp->aiocb_array[i] == pd->aiocb) {
481                         sp->num_finished++;
482                         TALLOC_FREE(pjobid);
483                         return;
484                 }
485         }
486
487         /* Jobid completed we weren't waiting for.
488            We must reshedule this as an immediate event
489            on the main event context. */
490         im = tevent_create_immediate(NULL);
491         if (!im) {
492                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
493         }
494
495         DEBUG(10,("aio_pthread_handle_suspend_completion: "
496                         "re-scheduling job id %d\n",
497                         *pjobid));
498
499         tevent_schedule_immediate(im,
500                         server_event_context(),
501                         aio_pthread_handle_immediate,
502                         (void *)pjobid);
503 }
504
505
506 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
507                                         struct tevent_timer *te,
508                                         struct timeval now,
509                                         void *private_data)
510 {
511         bool *timed_out = (bool *)private_data;
512         /* Remove this timed event handler. */
513         TALLOC_FREE(te);
514         *timed_out = true;
515 }
516
517 /************************************************************************
518  Called to request everything to stop until all IO is completed.
519 ***********************************************************************/
520
521 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
522                         struct files_struct *fsp,
523                         const SMB_STRUCT_AIOCB * const aiocb_array[],
524                         int n,
525                         const struct timespec *timeout)
526 {
527         struct event_context *ev = NULL;
528         struct fd_event *sock_event = NULL;
529         int ret = -1;
530         struct suspend_private sp;
531         bool timed_out = false;
532         TALLOC_CTX *frame = talloc_stackframe();
533
534         /* This is a blocking call, and has to use a sub-event loop. */
535         ev = event_context_init(frame);
536         if (ev == NULL) {
537                 errno = ENOMEM;
538                 goto out;
539         }
540
541         if (timeout) {
542                 struct timeval tv = convert_timespec_to_timeval(*timeout);
543                 struct tevent_timer *te = tevent_add_timer(ev,
544                                                 frame,
545                                                 timeval_current_ofs(tv.tv_sec,
546                                                                     tv.tv_usec),
547                                                 aio_pthread_suspend_timed_out,
548                                                 &timed_out);
549                 if (!te) {
550                         errno = ENOMEM;
551                         goto out;
552                 }
553         }
554
555         ZERO_STRUCT(sp);
556         sp.num_entries = n;
557         sp.aiocb_array = aiocb_array;
558         sp.num_finished = 0;
559
560         sock_event = tevent_add_fd(ev,
561                                 frame,
562                                 pthreadpool_signal_fd(pool),
563                                 TEVENT_FD_READ,
564                                 aio_pthread_handle_suspend_completion,
565                                 (void *)&sp);
566         if (sock_event == NULL) {
567                 pthreadpool_destroy(pool);
568                 pool = NULL;
569                 goto out;
570         }
571         /*
572          * We're going to cheat here. We know that smbd/aio.c
573          * only calls this when it's waiting for every single
574          * outstanding call to finish on a close, so just wait
575          * individually for each IO to complete. We don't care
576          * what order they finish - only that they all do. JRA.
577          */
578         while (sp.num_entries != sp.num_finished) {
579                 if (tevent_loop_once(ev) == -1) {
580                         goto out;
581                 }
582
583                 if (timed_out) {
584                         errno = EAGAIN;
585                         goto out;
586                 }
587         }
588
589         ret = 0;
590
591   out:
592
593         TALLOC_FREE(frame);
594         return ret;
595 }
596
597 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
598         .aio_read_fn = aio_pthread_read,
599         .aio_write_fn = aio_pthread_write,
600         .aio_return_fn = aio_pthread_return_fn,
601         .aio_cancel_fn = aio_pthread_cancel,
602         .aio_error_fn = aio_pthread_error_fn,
603         .aio_suspend_fn = aio_pthread_suspend,
604 };
605
606 NTSTATUS vfs_aio_pthread_init(void);
607 NTSTATUS vfs_aio_pthread_init(void)
608 {
609         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
610                                 "aio_pthread", &vfs_aio_pthread_fns);
611 }