This patch make it possible to build the events library completely
[metze/samba/wip.git] / source / lib / events / events_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23 /*
24   this is a very strange beast. The Linux AIO implementation doesn't
25   yet integrate properly with epoll, but there is a kernel patch that
26   allows the aio wait primitives to be used to wait for epoll events,
27   and this can be used to give us a unified event system incorporating
28   both aio events and epoll events
29
30   this is _very_ experimental code
31 */
32
33 #include "system/filesys.h"
34 #include "system/network.h"
35 #include "events.h"
36 #include "events_internal.h"
37 #include <sys/epoll.h>
38 #include <libaio.h>
39
40 #define MAX_AIO_QUEUE_DEPTH     100
41 #ifndef IOCB_CMD_EPOLL_WAIT
42 #define IOCB_CMD_EPOLL_WAIT     9
43 #endif
44
45 struct aio_event_context {
46         /* a pointer back to the generic event_context */
47         struct event_context *ev;
48
49         /* list of filedescriptor events */
50         struct fd_event *fd_events;
51
52         /* number of registered fd event handlers */
53         int num_fd_events;
54
55         uint32_t destruction_count;
56
57         io_context_t ioctx;
58
59         struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
60
61         struct iocb *epoll_iocb;
62
63         int epoll_fd;
64         int is_epoll_set;
65         pid_t pid;
66 };
67
68 struct aio_event {
69         struct event_context *event_ctx;
70         struct iocb iocb;
71         void *private_data;
72         event_aio_handler_t handler;
73 };
74
75 /*
76   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
77 */
78 static uint32_t epoll_map_flags(uint16_t flags)
79 {
80         uint32_t ret = 0;
81         if (flags & EVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
82         if (flags & EVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
83         return ret;
84 }
85
86 /*
87  free the epoll fd
88 */
89 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
90 {
91         io_queue_release(aio_ev->ioctx);
92         close(aio_ev->epoll_fd);
93         aio_ev->epoll_fd = -1;
94         return 0;
95 }
96
97 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde);
98
99 /*
100   reopen the epoll handle when our pid changes
101   see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an 
102   demonstration of why this is needed
103  */
104 static void epoll_check_reopen(struct aio_event_context *aio_ev)
105 {
106         struct fd_event *fde;
107
108         if (aio_ev->pid == getpid()) {
109                 return;
110         }
111
112         close(aio_ev->epoll_fd);
113         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
114         if (aio_ev->epoll_fd == -1) {
115                 ev_debug(aio_ev->ev, EV_DEBUG_FATAL, "Failed to recreate epoll handle after fork\n");
116                 return;
117         }
118         aio_ev->pid = getpid();
119         for (fde=aio_ev->fd_events;fde;fde=fde->next) {
120                 epoll_add_event(aio_ev, fde);
121         }
122 }
123
124 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
125 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
126 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
127
128 /*
129  add the epoll event to the given fd_event
130 */
131 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde)
132 {
133         struct epoll_event event;
134         if (aio_ev->epoll_fd == -1) return;
135
136         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
137
138         /* if we don't want events yet, don't add an aio_event */
139         if (fde->flags == 0) return;
140
141         ZERO_STRUCT(event);
142         event.events = epoll_map_flags(fde->flags);
143         event.data.ptr = fde;
144         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
145         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
146
147         /* only if we want to read we want to tell the event handler about errors */
148         if (fde->flags & EVENT_FD_READ) {
149                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
150         }
151 }
152
153 /*
154  delete the epoll event for given fd_event
155 */
156 static void epoll_del_event(struct aio_event_context *aio_ev, struct fd_event *fde)
157 {
158         struct epoll_event event;
159
160         DLIST_REMOVE(aio_ev->fd_events, fde);
161
162         if (aio_ev->epoll_fd == -1) return;
163
164         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
165
166         /* if there's no aio_event, we don't need to delete it */
167         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
168
169         ZERO_STRUCT(event);
170         event.events = epoll_map_flags(fde->flags);
171         event.data.ptr = fde;
172         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
173
174         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
175 }
176
177 /*
178  change the epoll event to the given fd_event
179 */
180 static void epoll_mod_event(struct aio_event_context *aio_ev, struct fd_event *fde)
181 {
182         struct epoll_event event;
183         if (aio_ev->epoll_fd == -1) return;
184
185         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
186
187         ZERO_STRUCT(event);
188         event.events = epoll_map_flags(fde->flags);
189         event.data.ptr = fde;
190         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
191
192         /* only if we want to read we want to tell the event handler about errors */
193         if (fde->flags & EVENT_FD_READ) {
194                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
195         }
196 }
197
198 static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event *fde)
199 {
200         bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
201         bool want_read = (fde->flags & EVENT_FD_READ);
202         bool want_write= (fde->flags & EVENT_FD_WRITE);
203
204         if (aio_ev->epoll_fd == -1) return;
205
206         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
207
208         /* there's already an event */
209         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
210                 if (want_read || (want_write && !got_error)) {
211                         epoll_mod_event(aio_ev, fde);
212                         return;
213                 }
214                 epoll_del_event(aio_ev, fde);
215                 return;
216         }
217
218         /* there's no aio_event attached to the fde */
219         if (want_read || (want_write && !got_error)) {
220                 DLIST_ADD(aio_ev->fd_events, fde);
221                 epoll_add_event(aio_ev, fde);
222                 return;
223         }
224 }
225
226 static int setup_epoll_wait(struct aio_event_context *aio_ev)
227 {
228         if (aio_ev->is_epoll_set) {
229                 return 0;
230         }
231         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
232         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
233         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
234         aio_ev->epoll_iocb->aio_reqprio = 0;
235
236         aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
237         aio_ev->epoll_iocb->u.c.offset = -1;
238         aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
239
240         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
241                 return -1;
242         }
243         aio_ev->is_epoll_set = 1;
244
245         return 0;
246 }
247
248
249 /*
250   event loop handling using aio/epoll hybrid
251 */
252 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
253 {
254         int ret, i;
255         uint32_t destruction_count = ++aio_ev->destruction_count;
256         struct timespec timeout;
257         struct io_event events[8];
258
259         if (aio_ev->epoll_fd == -1) return -1;
260
261         if (aio_ev->ev->num_signal_handlers && 
262             common_event_check_signal(aio_ev->ev)) {
263                 return 0;
264         }
265
266         if (tvalp) {
267                 timeout.tv_sec = tvalp->tv_sec;
268                 timeout.tv_nsec = tvalp->tv_usec;
269                 timeout.tv_nsec *= 1000;
270         }
271
272         if (setup_epoll_wait(aio_ev) < 0) 
273                 return -1;
274
275         ret = io_getevents(aio_ev->ioctx, 1, 8,
276                            events, tvalp?&timeout:NULL);
277
278         if (ret == -EINTR) {
279                 if (aio_ev->ev->num_signal_handlers) {
280                         common_event_check_signal(aio_ev->ev);
281                 }
282                 return 0;
283         }
284
285         if (ret == 0 && tvalp) {
286                 /* we don't care about a possible delay here */
287                 common_event_loop_timer_delay(aio_ev->ev);
288                 return 0;
289         }
290
291         for (i=0;i<ret;i++) {
292                 struct io_event *event = &events[i];
293                 struct iocb *finished = event->obj;
294
295                 switch (finished->aio_lio_opcode) {
296                 case IO_CMD_PWRITE:
297                 case IO_CMD_PREAD: {
298                         struct aio_event *ae = talloc_get_type(finished->data, 
299                                                                struct aio_event);
300                         if (ae) {
301                                 talloc_set_destructor(ae, NULL);
302                                 ae->handler(ae->event_ctx, ae, 
303                                             event->res, ae->private_data);
304                                 talloc_free(ae);
305                         }
306                         break;
307                 }
308                 case IOCB_CMD_EPOLL_WAIT: {
309                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
310                         struct fd_event *fde;
311                         uint16_t flags = 0;
312                         int j;
313
314                         aio_ev->is_epoll_set = 0;
315
316                         for (j=0; j<event->res; j++, ep++) {
317                                 fde = talloc_get_type(ep->data.ptr, 
318                                                       struct fd_event);
319                                 if (fde == NULL) {
320                                         return -1;
321                                 }
322                                 if (ep->events & (EPOLLHUP|EPOLLERR)) {
323                                         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
324                                         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
325                                                 epoll_del_event(aio_ev, fde);
326                                                 continue;
327                                         }
328                                         flags |= EVENT_FD_READ;
329                                 }
330                                 if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
331                                 if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
332                                 if (flags) {
333                                         fde->handler(aio_ev->ev, fde, flags, fde->private_data);
334                                 }
335                         }
336                         break;
337                 }
338                 }
339                 if (destruction_count != aio_ev->destruction_count) {
340                         return 0;
341                 }
342         }
343
344         return 0;
345 }
346
347 /*
348   create a aio_event_context structure.
349 */
350 static int aio_event_context_init(struct event_context *ev)
351 {
352         struct aio_event_context *aio_ev;
353         
354         aio_ev = talloc_zero(ev, struct aio_event_context);
355         if (!aio_ev) return -1;
356
357         aio_ev->ev = ev;
358         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
359
360         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
361                 talloc_free(aio_ev);
362                 return -1;
363         }
364
365         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
366         if (aio_ev->epoll_fd == -1) {
367                 talloc_free(aio_ev);
368                 return -1;
369         }
370         aio_ev->pid = getpid();
371
372         talloc_set_destructor(aio_ev, aio_ctx_destructor);
373
374         ev->additional_data = aio_ev;
375
376         if (setup_epoll_wait(aio_ev) < 0) {
377                 talloc_free(aio_ev);
378                 return -1;
379         }
380
381         return 0;
382 }
383
384 /*
385   destroy an fd_event
386 */
387 static int aio_event_fd_destructor(struct fd_event *fde)
388 {
389         struct event_context *ev = fde->event_ctx;
390         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
391                                                            struct aio_event_context);
392
393         epoll_check_reopen(aio_ev);
394
395         aio_ev->num_fd_events--;
396         aio_ev->destruction_count++;
397
398         epoll_del_event(aio_ev, fde);
399
400         if (fde->flags & EVENT_FD_AUTOCLOSE) {
401                 close(fde->fd);
402                 fde->fd = -1;
403         }
404
405         return 0;
406 }
407
408 /*
409   add a fd based event
410   return NULL on failure (memory allocation error)
411 */
412 static struct fd_event *aio_event_add_fd(struct event_context *ev, TALLOC_CTX *mem_ctx,
413                                          int fd, uint16_t flags,
414                                          event_fd_handler_t handler,
415                                          void *private_data)
416 {
417         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
418                                                            struct aio_event_context);
419         struct fd_event *fde;
420
421         epoll_check_reopen(aio_ev);
422
423         fde = talloc(mem_ctx?mem_ctx:ev, struct fd_event);
424         if (!fde) return NULL;
425
426         fde->event_ctx          = ev;
427         fde->fd                 = fd;
428         fde->flags              = flags;
429         fde->handler            = handler;
430         fde->private_data       = private_data;
431         fde->additional_flags   = 0;
432         fde->additional_data    = NULL;
433
434         aio_ev->num_fd_events++;
435         talloc_set_destructor(fde, aio_event_fd_destructor);
436
437         DLIST_ADD(aio_ev->fd_events, fde);
438         epoll_add_event(aio_ev, fde);
439
440         return fde;
441 }
442
443
444 /*
445   return the fd event flags
446 */
447 static uint16_t aio_event_get_fd_flags(struct fd_event *fde)
448 {
449         return fde->flags;
450 }
451
452 /*
453   set the fd event flags
454 */
455 static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
456 {
457         struct event_context *ev;
458         struct aio_event_context *aio_ev;
459
460         if (fde->flags == flags) return;
461
462         ev = fde->event_ctx;
463         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
464
465         fde->flags = flags;
466
467         epoll_check_reopen(aio_ev);
468
469         epoll_change_event(aio_ev, fde);
470 }
471
472 /*
473   do a single event loop using the events defined in ev 
474 */
475 static int aio_event_loop_once(struct event_context *ev)
476 {
477         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
478                                                            struct aio_event_context);
479         struct timeval tval;
480
481         tval = common_event_loop_timer_delay(ev);
482         if (timeval_is_zero(&tval)) {
483                 return 0;
484         }
485
486         epoll_check_reopen(aio_ev);
487
488         return aio_event_loop(aio_ev, &tval);
489 }
490
491 /*
492   return on failure or (with 0) if all fd events are removed
493 */
494 static int aio_event_loop_wait(struct event_context *ev)
495 {
496         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
497                                                            struct aio_event_context);
498         while (aio_ev->num_fd_events) {
499                 if (aio_event_loop_once(ev) != 0) {
500                         break;
501                 }
502         }
503
504         return 0;
505 }
506
507 /*
508   called when a disk IO event needs to be cancelled
509 */
510 static int aio_destructor(struct aio_event *ae)
511 {
512         struct event_context *ev = ae->event_ctx;
513         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
514                                                            struct aio_event_context);
515         struct io_event result;
516         io_cancel(aio_ev->ioctx, &ae->iocb, &result);
517         /* TODO: handle errors from io_cancel()! */
518         return 0;
519 }
520
521 /* submit an aio disk IO event */
522 static struct aio_event *aio_event_add_aio(struct event_context *ev, 
523                                            TALLOC_CTX *mem_ctx,
524                                            struct iocb *iocb,
525                                            event_aio_handler_t handler,
526                                            void *private_data)
527 {
528         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
529                                                            struct aio_event_context);
530         struct iocb *iocbp;
531         struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event);
532         if (ae == NULL) return NULL;
533
534         ae->event_ctx    = ev;
535         ae->iocb         = *iocb;
536         ae->handler      = handler;
537         ae->private_data = private_data;
538         iocbp = &ae->iocb;
539
540         if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
541                 talloc_free(ae);
542                 return NULL;
543         }
544         ae->iocb.data = ae;
545         talloc_set_destructor(ae, aio_destructor);
546
547         return ae;
548 }
549
550 static const struct event_ops aio_event_ops = {
551         .context_init   = aio_event_context_init,
552         .add_fd         = aio_event_add_fd,
553         .add_aio        = aio_event_add_aio,
554         .get_fd_flags   = aio_event_get_fd_flags,
555         .set_fd_flags   = aio_event_set_fd_flags,
556         .add_timed      = common_event_add_timed,
557         .add_signal     = common_event_add_signal,
558         .loop_once      = aio_event_loop_once,
559         .loop_wait      = aio_event_loop_wait,
560 };
561
562 bool events_aio_init(void)
563 {
564         return event_register_backend("aio", &aio_event_ops);
565 }
566