Add some debug to vfs_aio_pthread so I can see when jobs start and stop.
[samba.git] / source3 / modules / vfs_aio_pthread.c
1 /*
2  * Simulate Posix AIO using pthreads.
3  *
4  * Based on the aio_fork work from Volker and Volker's pthreadpool library.
5  *
6  * Copyright (C) Volker Lendecke 2008
7  * Copyright (C) Jeremy Allison 2012
8  *
9  * This program is free software; you can redistribute it and/or modify
10  * it under the terms of the GNU General Public License as published by
11  * the Free Software Foundation; either version 3 of the License, or
12  * (at your option) any later version.
13  *
14  * This program is distributed in the hope that it will be useful,
15  * but WITHOUT ANY WARRANTY; without even the implied warranty of
16  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17  * GNU General Public License for more details.
18  *
19  * You should have received a copy of the GNU General Public License
20  * along with this program; if not, write to the Free Software
21  * Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
22  */
23
24 #include "includes.h"
25 #include "system/filesys.h"
26 #include "system/shmem.h"
27 #include "smbd/smbd.h"
28 #include "pthreadpool.h"
29
30 struct aio_extra;
31 static struct pthreadpool *pool;
32 static int aio_pthread_jobid;
33
34 struct aio_private_data {
35         struct aio_private_data *prev, *next;
36         int jobid;
37         SMB_STRUCT_AIOCB *aiocb;
38         ssize_t ret_size;
39         int ret_errno;
40         bool cancelled;
41         bool write_command;
42 };
43
44 /* List of outstanding requests we have. */
45 struct aio_private_data *pd_list;
46
47 static void aio_pthread_handle_completion(struct event_context *event_ctx,
48                                 struct fd_event *event,
49                                 uint16 flags,
50                                 void *p);
51
52 /************************************************************************
53  How many threads to initialize ?
54 ***********************************************************************/
55
56 static int aio_get_num_threads(void)
57 {
58         return 10;
59 }
60
61 #if 0
62 /************************************************************************
63  Called every 30 seconds to destroy pool if it's idle.
64 ***********************************************************************/
65
66 static void idle_pool_destroy_timer(struct tevent_context *ev,
67                         struct tevent_timer *te,
68                         struct timeval current_time,
69                         void *private_data)
70 {
71         struct timeval ne;
72
73         TALLOC_FREE(te);
74
75         if (pool && pd_list == NULL) {
76                 if (pthreadpool_destroy(pool) == 0) {
77                         pool = NULL;
78                 }
79                 DEBUG(10,("idle_pool_destroy_timer: destroyed AIO pool.\n"));
80                 return;
81         }
82
83         /* Here, the IO is still active. */
84
85         /* Set an event up for 30 seconds time - if we have
86            no outstanding IO at this time shut the threadpool
87            down. */
88         ne = tevent_timeval_current_ofs(30, 0);
89         tevent_add_timer(server_event_context(),
90                         NULL,
91                         ne,
92                         idle_pool_destroy_timer,
93                         NULL);
94 }
95 #endif
96
97 /************************************************************************
98  Ensure thread pool is initialized.
99 ***********************************************************************/
100
101 static bool init_aio_threadpool(void)
102 {
103         struct fd_event *sock_event = NULL;
104         int ret = 0;
105         int num_threads = aio_get_num_threads();
106 #if 0
107         struct timeval ne;
108 #endif
109
110         if (pool) {
111                 return true;
112         }
113
114         ret = pthreadpool_init(num_threads, &pool);
115         if (ret) {
116                 errno = ret;
117                 return false;
118         }
119         sock_event = tevent_add_fd(server_event_context(),
120                                 NULL,
121                                 pthreadpool_signal_fd(pool),
122                                 TEVENT_FD_READ,
123                                 aio_pthread_handle_completion,
124                                 NULL);
125         if (sock_event == NULL) {
126                 pthreadpool_destroy(pool);
127                 pool = NULL;
128                 return false;
129         }
130
131 #if 0
132         /* Set an event up for 30 seconds time - if we have
133            no outstanding IO at this time shut the threadpool
134            down. */
135         ne = tevent_timeval_current_ofs(30, 0);
136         tevent_add_timer(server_event_context(),
137                         NULL,
138                         ne,
139                         idle_pool_destroy_timer,
140                         NULL);
141 #endif
142
143         DEBUG(10,("init_aio_threadpool: initialized with %d threads\n",
144                         num_threads));
145
146         return true;
147 }
148
149
150 /************************************************************************
151  Worker function - core of the pthread aio engine.
152  This is the function that actually does the IO.
153 ***********************************************************************/
154
155 static void aio_worker(void *private_data)
156 {
157         struct aio_private_data *pd =
158                         (struct aio_private_data *)private_data;
159
160         if (pd->write_command) {
161                 pd->ret_size = pwrite(pd->aiocb->aio_fildes,
162                                 (const void *)pd->aiocb->aio_buf,
163                                 pd->aiocb->aio_nbytes,
164                                 pd->aiocb->aio_offset);
165         } else {
166                 pd->ret_size = pread(pd->aiocb->aio_fildes,
167                                 (void *)pd->aiocb->aio_buf,
168                                 pd->aiocb->aio_nbytes,
169                                 pd->aiocb->aio_offset);
170         }
171         if (pd->ret_size == -1) {
172                 pd->ret_errno = errno;
173         } else {
174                 pd->ret_errno = 0;
175         }
176 }
177
178 /************************************************************************
179  Private data destructor.
180 ***********************************************************************/
181
182 static int pd_destructor(struct aio_private_data *pd)
183 {
184         DLIST_REMOVE(pd_list, pd);
185         return 0;
186 }
187
188 /************************************************************************
189  Create and initialize a private data struct.
190 ***********************************************************************/
191
192 static struct aio_private_data *create_private_data(TALLOC_CTX *ctx,
193                                         SMB_STRUCT_AIOCB *aiocb)
194 {
195         struct aio_private_data *pd = talloc_zero(ctx, struct aio_private_data);
196         if (!pd) {
197                 return NULL;
198         }
199         pd->jobid = aio_pthread_jobid++;
200         pd->aiocb = aiocb;
201         pd->ret_size = -1;
202         pd->ret_errno = EINPROGRESS;
203         talloc_set_destructor(pd, pd_destructor);
204         DLIST_ADD_END(pd_list, pd, struct aio_private_data *);
205         return pd;
206 }
207
208 /************************************************************************
209  Spin off a threadpool (if needed) and initiate a pread call.
210 ***********************************************************************/
211
212 static int aio_pthread_read(struct vfs_handle_struct *handle,
213                                 struct files_struct *fsp,
214                                 SMB_STRUCT_AIOCB *aiocb)
215 {
216         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
217         struct aio_private_data *pd = NULL;
218         int ret;
219
220         if (!init_aio_threadpool()) {
221                 return -1;
222         }
223
224         pd = create_private_data(aio_ex, aiocb);
225         if (pd == NULL) {
226                 DEBUG(10, ("aio_pthread_read: Could not create private data.\n"));
227                 return -1;
228         }
229
230         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
231         if (ret) {
232                 errno = ret;
233                 return -1;
234         }
235
236         DEBUG(10, ("aio_pthread_read: jobid=%d pread requested "
237                 "of %llu bytes at offset %llu\n",
238                 pd->jobid,
239                 (unsigned long long)pd->aiocb->aio_nbytes,
240                 (unsigned long long)pd->aiocb->aio_offset));
241
242         return 0;
243 }
244
245 /************************************************************************
246  Spin off a threadpool (if needed) and initiate a pwrite call.
247 ***********************************************************************/
248
249 static int aio_pthread_write(struct vfs_handle_struct *handle,
250                                 struct files_struct *fsp,
251                                 SMB_STRUCT_AIOCB *aiocb)
252 {
253         struct aio_extra *aio_ex = (struct aio_extra *)aiocb->aio_sigevent.sigev_value.sival_ptr;
254         struct aio_private_data *pd = NULL;
255         int ret;
256
257         if (!init_aio_threadpool()) {
258                 return -1;
259         }
260
261         pd = create_private_data(aio_ex, aiocb);
262         if (pd == NULL) {
263                 DEBUG(10, ("aio_pthread_write: Could not create private data.\n"));
264                 return -1;
265         }
266
267         pd->write_command = true;
268
269         ret = pthreadpool_add_job(pool, pd->jobid, aio_worker, (void *)pd);
270         if (ret) {
271                 errno = ret;
272                 return -1;
273         }
274
275         DEBUG(10, ("aio_pthread_write: jobid=%d pwrite requested "
276                 "of %llu bytes at offset %llu\n",
277                 pd->jobid,
278                 (unsigned long long)pd->aiocb->aio_nbytes,
279                 (unsigned long long)pd->aiocb->aio_offset));
280
281         return 0;
282 }
283
284 /************************************************************************
285  Find the private data by jobid.
286 ***********************************************************************/
287
288 static struct aio_private_data *find_private_data_by_jobid(int jobid)
289 {
290         struct aio_private_data *pd;
291
292         for (pd = pd_list; pd != NULL; pd = pd->next) {
293                 if (pd->jobid == jobid) {
294                         return pd;
295                 }
296         }
297
298         return NULL;
299 }
300
301 /************************************************************************
302  Callback when an IO completes.
303 ***********************************************************************/
304
305 static void aio_pthread_handle_completion(struct event_context *event_ctx,
306                                 struct fd_event *event,
307                                 uint16 flags,
308                                 void *p)
309 {
310         struct aio_extra *aio_ex = NULL;
311         struct aio_private_data *pd = NULL;
312         int jobid = 0;
313         int ret;
314
315         DEBUG(10, ("aio_pthread_handle_completion called with flags=%d\n",
316                         (int)flags));
317
318         if ((flags & EVENT_FD_READ) == 0) {
319                 return;
320         }
321
322         ret = pthreadpool_finished_job(pool, &jobid);
323         if (ret) {
324                 smb_panic("aio_pthread_handle_completion");
325                 return;
326         }
327
328         pd = find_private_data_by_jobid(jobid);
329         if (pd == NULL) {
330                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
331                           jobid));
332                 return;
333         }
334
335         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
336         smbd_aio_complete_aio_ex(aio_ex);
337
338         DEBUG(10,("aio_pthread_handle_completion: jobid %d completed\n",
339                 jobid ));
340
341 }
342
343 /************************************************************************
344  Find the private data by aiocb.
345 ***********************************************************************/
346
347 static struct aio_private_data *find_private_data_by_aiocb(SMB_STRUCT_AIOCB *aiocb)
348 {
349         struct aio_private_data *pd;
350
351         for (pd = pd_list; pd != NULL; pd = pd->next) {
352                 if (pd->aiocb == aiocb) {
353                         return pd;
354                 }
355         }
356
357         return NULL;
358 }
359
360 /************************************************************************
361  Called to return the result of a completed AIO.
362  Should only be called if aio_error returns something other than EINPROGRESS.
363  Returns:
364         Any other value - return from IO operation.
365 ***********************************************************************/
366
367 static ssize_t aio_pthread_return_fn(struct vfs_handle_struct *handle,
368                                 struct files_struct *fsp,
369                                 SMB_STRUCT_AIOCB *aiocb)
370 {
371         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
372
373         if (pd == NULL) {
374                 errno = EINVAL;
375                 DEBUG(0, ("aio_pthread_return_fn: returning EINVAL\n"));
376                 return -1;
377         }
378
379         pd->aiocb = NULL;
380
381         if (pd->ret_size == -1) {
382                 errno = pd->ret_errno;
383         }
384
385         return pd->ret_size;
386 }
387
388 /************************************************************************
389  Called to check the result of an AIO.
390  Returns:
391         EINPROGRESS - still in progress.
392         EINVAL - invalid aiocb.
393         ECANCELED - request was cancelled.
394         0 - request completed successfully.
395         Any other value - errno from IO operation.
396 ***********************************************************************/
397
398 static int aio_pthread_error_fn(struct vfs_handle_struct *handle,
399                              struct files_struct *fsp,
400                              SMB_STRUCT_AIOCB *aiocb)
401 {
402         struct aio_private_data *pd = find_private_data_by_aiocb(aiocb);
403
404         if (pd == NULL) {
405                 return EINVAL;
406         }
407         if (pd->cancelled) {
408                 return ECANCELED;
409         }
410         return pd->ret_errno;
411 }
412
413 /************************************************************************
414  Called to request the cancel of an AIO, or all of them on a specific
415  fsp if aiocb == NULL.
416 ***********************************************************************/
417
418 static int aio_pthread_cancel(struct vfs_handle_struct *handle,
419                         struct files_struct *fsp,
420                         SMB_STRUCT_AIOCB *aiocb)
421 {
422         struct aio_private_data *pd = NULL;
423
424         for (pd = pd_list; pd != NULL; pd = pd->next) {
425                 if (pd->aiocb == NULL) {
426                         continue;
427                 }
428                 if (pd->aiocb->aio_fildes != fsp->fh->fd) {
429                         continue;
430                 }
431                 if ((aiocb != NULL) && (pd->aiocb != aiocb)) {
432                         continue;
433                 }
434
435                 /*
436                  * We let the child do its job, but we discard the result when
437                  * it's finished.
438                  */
439
440                 pd->cancelled = true;
441         }
442
443         return AIO_CANCELED;
444 }
445
446 /************************************************************************
447  Callback for a previously detected job completion.
448 ***********************************************************************/
449
450 static void aio_pthread_handle_immediate(struct tevent_context *ctx,
451                                 struct tevent_immediate *im,
452                                 void *private_data)
453 {
454         struct aio_extra *aio_ex = NULL;
455         int *pjobid = (int *)private_data;
456         struct aio_private_data *pd = find_private_data_by_jobid(*pjobid);
457
458         if (pd == NULL) {
459                 DEBUG(1, ("aio_pthread_handle_immediate cannot find jobid %d\n",
460                           *pjobid));
461                 TALLOC_FREE(pjobid);
462                 return;
463         }
464
465         TALLOC_FREE(pjobid);
466         aio_ex = (struct aio_extra *)pd->aiocb->aio_sigevent.sigev_value.sival_ptr;
467         smbd_aio_complete_aio_ex(aio_ex);
468 }
469
470 /************************************************************************
471  Private data struct used in suspend completion code.
472 ***********************************************************************/
473
474 struct suspend_private {
475         int num_entries;
476         int num_finished;
477         const SMB_STRUCT_AIOCB * const *aiocb_array;
478 };
479
480 /************************************************************************
481  Callback when an IO completes from a suspend call.
482 ***********************************************************************/
483
484 static void aio_pthread_handle_suspend_completion(struct event_context *event_ctx,
485                                 struct fd_event *event,
486                                 uint16 flags,
487                                 void *p)
488 {
489         struct suspend_private *sp = (struct suspend_private *)p;
490         struct aio_private_data *pd = NULL;
491         struct tevent_immediate *im = NULL;
492         int *pjobid = NULL;
493         int i;
494
495         DEBUG(10, ("aio_pthread_handle_suspend_completion called with flags=%d\n",
496                         (int)flags));
497
498         if ((flags & EVENT_FD_READ) == 0) {
499                 return;
500         }
501
502         pjobid = talloc_array(NULL, int, 1);
503         if (pjobid) {
504                 smb_panic("aio_pthread_handle_suspend_completion: no memory.");
505         }
506
507         if (pthreadpool_finished_job(pool, pjobid)) {
508                 smb_panic("aio_pthread_handle_suspend_completion: can't find job.");
509                 return;
510         }
511
512         pd = find_private_data_by_jobid(*pjobid);
513         if (pd == NULL) {
514                 DEBUG(1, ("aio_pthread_handle_completion cannot find jobid %d\n",
515                           *pjobid));
516                 TALLOC_FREE(pjobid);
517                 return;
518         }
519
520         /* Is this a jobid with an aiocb we're interested in ? */
521         for (i = 0; i < sp->num_entries; i++) {
522                 if (sp->aiocb_array[i] == pd->aiocb) {
523                         sp->num_finished++;
524                         TALLOC_FREE(pjobid);
525                         return;
526                 }
527         }
528
529         /* Jobid completed we weren't waiting for.
530            We must reshedule this as an immediate event
531            on the main event context. */
532         im = tevent_create_immediate(NULL);
533         if (!im) {
534                 exit_server_cleanly("aio_pthread_handle_suspend_completion: no memory");
535         }
536
537         DEBUG(10,("aio_pthread_handle_suspend_completion: "
538                         "re-scheduling job id %d\n",
539                         *pjobid));
540
541         tevent_schedule_immediate(im,
542                         server_event_context(),
543                         aio_pthread_handle_immediate,
544                         (void *)pjobid);
545 }
546
547
548 static void aio_pthread_suspend_timed_out(struct tevent_context *event_ctx,
549                                         struct tevent_timer *te,
550                                         struct timeval now,
551                                         void *private_data)
552 {
553         bool *timed_out = (bool *)private_data;
554         /* Remove this timed event handler. */
555         TALLOC_FREE(te);
556         *timed_out = true;
557 }
558
559 /************************************************************************
560  Called to request everything to stop until all IO is completed.
561 ***********************************************************************/
562
563 static int aio_pthread_suspend(struct vfs_handle_struct *handle,
564                         struct files_struct *fsp,
565                         const SMB_STRUCT_AIOCB * const aiocb_array[],
566                         int n,
567                         const struct timespec *timeout)
568 {
569         struct event_context *ev = NULL;
570         struct fd_event *sock_event = NULL;
571         int ret = -1;
572         struct suspend_private sp;
573         bool timed_out = false;
574         TALLOC_CTX *frame = talloc_stackframe();
575
576         /* This is a blocking call, and has to use a sub-event loop. */
577         ev = event_context_init(frame);
578         if (ev == NULL) {
579                 errno = ENOMEM;
580                 goto out;
581         }
582
583         if (timeout) {
584                 struct timeval tv = convert_timespec_to_timeval(*timeout);
585                 struct tevent_timer *te = tevent_add_timer(ev,
586                                                 frame,
587                                                 timeval_current_ofs(tv.tv_sec,
588                                                                     tv.tv_usec),
589                                                 aio_pthread_suspend_timed_out,
590                                                 &timed_out);
591                 if (!te) {
592                         errno = ENOMEM;
593                         goto out;
594                 }
595         }
596
597         ZERO_STRUCT(sp);
598         sp.num_entries = n;
599         sp.aiocb_array = aiocb_array;
600         sp.num_finished = 0;
601
602         sock_event = tevent_add_fd(ev,
603                                 frame,
604                                 pthreadpool_signal_fd(pool),
605                                 TEVENT_FD_READ,
606                                 aio_pthread_handle_suspend_completion,
607                                 (void *)&sp);
608         if (sock_event == NULL) {
609                 pthreadpool_destroy(pool);
610                 pool = NULL;
611                 goto out;
612         }
613         /*
614          * We're going to cheat here. We know that smbd/aio.c
615          * only calls this when it's waiting for every single
616          * outstanding call to finish on a close, so just wait
617          * individually for each IO to complete. We don't care
618          * what order they finish - only that they all do. JRA.
619          */
620         while (sp.num_entries != sp.num_finished) {
621                 if (tevent_loop_once(ev) == -1) {
622                         goto out;
623                 }
624
625                 if (timed_out) {
626                         errno = EAGAIN;
627                         goto out;
628                 }
629         }
630
631         ret = 0;
632
633   out:
634
635         TALLOC_FREE(frame);
636         return ret;
637 }
638
639 static struct vfs_fn_pointers vfs_aio_pthread_fns = {
640         .aio_read_fn = aio_pthread_read,
641         .aio_write_fn = aio_pthread_write,
642         .aio_return_fn = aio_pthread_return_fn,
643         .aio_cancel_fn = aio_pthread_cancel,
644         .aio_error_fn = aio_pthread_error_fn,
645         .aio_suspend_fn = aio_pthread_suspend,
646 };
647
648 NTSTATUS vfs_aio_pthread_init(void);
649 NTSTATUS vfs_aio_pthread_init(void)
650 {
651         return smb_register_vfs(SMB_VFS_INTERFACE_VERSION,
652                                 "aio_pthread", &vfs_aio_pthread_fns);
653 }