r20788: - remove epoll configure checks from libreplace
[metze/samba/wip.git] / source / lib / events / events_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 2 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program; if not, write to the Free Software
22    Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
23 */
24 /*
25   this is a very strange beast. The Linux AIO implementation doesn't
26   yet integrate properly with epoll, but there is a kernel patch that
27   allows the aio wait primitives to be used to wait for epoll events,
28   and this can be used to give us a unified event system incorporating
29   both aio events and epoll events
30
31   this is _very_ experimental code
32 */
33
34 #include "includes.h"
35 #include "system/filesys.h"
36 #include "lib/util/dlinklist.h"
37 #include "lib/events/events.h"
38 #include "lib/events/events_internal.h"
39 #include <sys/epoll.h>
40 #include <libaio.h>
41
42 #define MAX_AIO_QUEUE_DEPTH     100
43 #ifndef IOCB_CMD_EPOLL_WAIT
44 #define IOCB_CMD_EPOLL_WAIT     9
45 #endif
46
47 struct aio_event_context {
48         /* a pointer back to the generic event_context */
49         struct event_context *ev;
50
51         /* number of registered fd event handlers */
52         int num_fd_events;
53
54         uint32_t destruction_count;
55
56         io_context_t ioctx;
57
58         struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
59
60         struct iocb *epoll_iocb;
61
62         int epoll_fd;
63         int is_epoll_set;
64 };
65
66 struct aio_event {
67         struct event_context *event_ctx;
68         struct iocb iocb;
69         void *private_data;
70         event_aio_handler_t handler;
71 };
72
73 /*
74   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
75 */
76 static uint32_t epoll_map_flags(uint16_t flags)
77 {
78         uint32_t ret = 0;
79         if (flags & EVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
80         if (flags & EVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
81         return ret;
82 }
83
84 /*
85  free the epoll fd
86 */
87 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
88 {
89         io_queue_release(aio_ev->ioctx);
90         close(aio_ev->epoll_fd);
91         aio_ev->epoll_fd = -1;
92         return 0;
93 }
94
95 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
96 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
97 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
98
99 /*
100  add the epoll event to the given fd_event
101 */
102 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde)
103 {
104         struct epoll_event event;
105         if (aio_ev->epoll_fd == -1) return;
106
107         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
108
109         /* if we don't want events yet, don't add an aio_event */
110         if (fde->flags == 0) return;
111
112         ZERO_STRUCT(event);
113         event.events = epoll_map_flags(fde->flags);
114         event.data.ptr = fde;
115         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
116         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
117
118         /* only if we want to read we want to tell the event handler about errors */
119         if (fde->flags & EVENT_FD_READ) {
120                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
121         }
122 }
123
124 /*
125  delete the epoll event for given fd_event
126 */
127 static void epoll_del_event(struct aio_event_context *aio_ev, struct fd_event *fde)
128 {
129         struct epoll_event event;
130         if (aio_ev->epoll_fd == -1) return;
131
132         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
133
134         /* if there's no aio_event, we don't need to delete it */
135         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
136
137         ZERO_STRUCT(event);
138         event.events = epoll_map_flags(fde->flags);
139         event.data.ptr = fde;
140         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
141
142         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
143 }
144
145 /*
146  change the epoll event to the given fd_event
147 */
148 static void epoll_mod_event(struct aio_event_context *aio_ev, struct fd_event *fde)
149 {
150         struct epoll_event event;
151         if (aio_ev->epoll_fd == -1) return;
152
153         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
154
155         ZERO_STRUCT(event);
156         event.events = epoll_map_flags(fde->flags);
157         event.data.ptr = fde;
158         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
159
160         /* only if we want to read we want to tell the event handler about errors */
161         if (fde->flags & EVENT_FD_READ) {
162                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
163         }
164 }
165
166 static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event *fde)
167 {
168         BOOL got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
169         BOOL want_read = (fde->flags & EVENT_FD_READ);
170         BOOL want_write= (fde->flags & EVENT_FD_WRITE);
171
172         if (aio_ev->epoll_fd == -1) return;
173
174         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
175
176         /* there's already an event */
177         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
178                 if (want_read || (want_write && !got_error)) {
179                         epoll_mod_event(aio_ev, fde);
180                         return;
181                 }
182                 epoll_del_event(aio_ev, fde);
183                 return;
184         }
185
186         /* there's no aio_event attached to the fde */
187         if (want_read || (want_write && !got_error)) {
188                 epoll_add_event(aio_ev, fde);
189                 return;
190         }
191 }
192
193 static int setup_epoll_wait(struct aio_event_context *aio_ev)
194 {
195         if (aio_ev->is_epoll_set) {
196                 return 0;
197         }
198         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
199         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
200         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
201         aio_ev->epoll_iocb->aio_reqprio = 0;
202
203         aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
204         aio_ev->epoll_iocb->u.c.offset = -1;
205         aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
206
207         aio_ev->is_epoll_set = 1;
208         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
209                 return -1;
210         }
211
212         return 0;
213 }
214
215
216 /*
217   event loop handling using aio/epoll hybrid
218 */
219 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
220 {
221         int ret, i;
222         uint32_t destruction_count = aio_ev->destruction_count;
223         struct timespec timeout;
224         struct io_event events[8];
225
226         if (aio_ev->epoll_fd == -1) return -1;
227
228         if (tvalp) {
229                 timeout.tv_sec = tvalp->tv_sec;
230                 timeout.tv_nsec = tvalp->tv_usec;
231                 timeout.tv_nsec *= 1000;
232         }
233
234         if (setup_epoll_wait(aio_ev) < 0) 
235                 return -1;
236
237         ret = io_getevents(aio_ev->ioctx, 1, 8,
238                            events, tvalp?&timeout:NULL);
239         if (ret == -EINTR) {
240                 return 0;
241         }
242
243         if (ret == 0 && tvalp) {
244                 common_event_loop_timer(aio_ev->ev);
245                 return 0;
246         }
247
248         for (i=0;i<ret;i++) {
249                 struct io_event *event = &events[i];
250                 struct iocb *finished = event->obj;
251
252                 switch (finished->aio_lio_opcode) {
253                 case IO_CMD_PWRITE:
254                 case IO_CMD_PREAD: {
255                         struct aio_event *ae = talloc_get_type(finished->data, 
256                                                                struct aio_event);
257                         if (ae) {
258                                 talloc_set_destructor(ae, NULL);
259                                 ae->handler(ae->event_ctx, ae, 
260                                             event->res, ae->private_data);
261                                 talloc_free(ae);
262                         }
263                         break;
264                 }
265                 case IOCB_CMD_EPOLL_WAIT: {
266                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
267                         struct fd_event *fde;
268                         uint16_t flags = 0;
269                         int j;
270
271                         aio_ev->is_epoll_set = 0;
272
273                         for (j=0; j<event->res; j++, ep++) {
274                                 fde = talloc_get_type(ep->data.ptr, 
275                                                       struct fd_event);
276                                 if (fde == NULL) {
277                                         return -1;
278                                 }
279                                 if (ep->events & (EPOLLHUP|EPOLLERR)) {
280                                         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
281                                         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
282                                                 epoll_del_event(aio_ev, fde);
283                                                 continue;
284                                         }
285                                         flags |= EVENT_FD_READ;
286                                 }
287                                 if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
288                                 if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
289                                 if (flags) {
290                                         fde->handler(aio_ev->ev, fde, flags, fde->private_data);
291                                 }
292                         }
293                         break;
294                 }
295                 }
296                 if (destruction_count != aio_ev->destruction_count) {
297                         return 0;
298                 }
299         }
300
301         return 0;
302 }
303
304 /*
305   create a aio_event_context structure.
306 */
307 static int aio_event_context_init(struct event_context *ev)
308 {
309         struct aio_event_context *aio_ev;
310         
311         aio_ev = talloc_zero(ev, struct aio_event_context);
312         if (!aio_ev) return -1;
313
314         aio_ev->ev = ev;
315         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
316
317         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
318                 return -1;
319         }
320
321         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
322         if (aio_ev->epoll_fd == -1) return -1;
323
324         talloc_set_destructor(aio_ev, aio_ctx_destructor);
325
326         ev->additional_data = aio_ev;
327         return 0;
328 }
329
330 /*
331   destroy an fd_event
332 */
333 static int aio_event_fd_destructor(struct fd_event *fde)
334 {
335         struct event_context *ev = fde->event_ctx;
336         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
337                                                            struct aio_event_context);
338
339         aio_ev->num_fd_events--;
340         aio_ev->destruction_count++;
341
342         epoll_del_event(aio_ev, fde);
343
344         return 0;
345 }
346
347 /*
348   add a fd based event
349   return NULL on failure (memory allocation error)
350 */
351 static struct fd_event *aio_event_add_fd(struct event_context *ev, TALLOC_CTX *mem_ctx,
352                                          int fd, uint16_t flags,
353                                          event_fd_handler_t handler,
354                                          void *private_data)
355 {
356         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
357                                                            struct aio_event_context);
358         struct fd_event *fde;
359
360         fde = talloc(mem_ctx?mem_ctx:ev, struct fd_event);
361         if (!fde) return NULL;
362
363         fde->event_ctx          = ev;
364         fde->fd                 = fd;
365         fde->flags              = flags;
366         fde->handler            = handler;
367         fde->private_data       = private_data;
368         fde->additional_flags   = 0;
369         fde->additional_data    = NULL;
370
371         aio_ev->num_fd_events++;
372         talloc_set_destructor(fde, aio_event_fd_destructor);
373
374         epoll_add_event(aio_ev, fde);
375
376         return fde;
377 }
378
379
380 /*
381   return the fd event flags
382 */
383 static uint16_t aio_event_get_fd_flags(struct fd_event *fde)
384 {
385         return fde->flags;
386 }
387
388 /*
389   set the fd event flags
390 */
391 static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
392 {
393         struct event_context *ev;
394         struct aio_event_context *aio_ev;
395
396         if (fde->flags == flags) return;
397
398         ev = fde->event_ctx;
399         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
400
401         fde->flags = flags;
402
403         epoll_change_event(aio_ev, fde);
404 }
405
406 /*
407   do a single event loop using the events defined in ev 
408 */
409 static int aio_event_loop_once(struct event_context *ev)
410 {
411         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
412                                                            struct aio_event_context);
413         struct timeval tval;
414
415         tval = common_event_loop_delay(ev);
416
417         if (timeval_is_zero(&tval)) {
418                 common_event_loop_timer(ev);
419                 return 0;
420         }
421
422         return aio_event_loop(aio_ev, &tval);
423 }
424
425 /*
426   return on failure or (with 0) if all fd events are removed
427 */
428 static int aio_event_loop_wait(struct event_context *ev)
429 {
430         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
431                                                            struct aio_event_context);
432         while (aio_ev->num_fd_events) {
433                 if (aio_event_loop_once(ev) != 0) {
434                         break;
435                 }
436         }
437
438         return 0;
439 }
440
441 /*
442   called when a disk IO event needs to be cancelled
443 */
444 static int aio_destructor(struct aio_event *ae)
445 {
446         struct event_context *ev = ae->event_ctx;
447         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
448                                                            struct aio_event_context);
449         struct io_event result;
450         io_cancel(aio_ev->ioctx, &ae->iocb, &result);
451         /* TODO: handle errors from io_cancel()! */
452         return 0;
453 }
454
455 /* submit an aio disk IO event */
456 static struct aio_event *aio_event_add_aio(struct event_context *ev, 
457                                            TALLOC_CTX *mem_ctx,
458                                            struct iocb *iocb,
459                                            event_aio_handler_t handler,
460                                            void *private_data)
461 {
462         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
463                                                            struct aio_event_context);
464         struct iocb *iocbp;
465         struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event);
466         if (ae == NULL) return NULL;
467
468         ae->event_ctx    = ev;
469         ae->iocb         = *iocb;
470         ae->handler      = handler;
471         ae->private_data = private_data;
472         iocbp = &ae->iocb;
473
474         if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
475                 talloc_free(ae);
476                 return NULL;
477         }
478         ae->iocb.data = ae;
479         talloc_set_destructor(ae, aio_destructor);
480
481         return ae;
482 }
483
484 static const struct event_ops aio_event_ops = {
485         .context_init   = aio_event_context_init,
486         .add_fd         = aio_event_add_fd,
487         .add_aio        = aio_event_add_aio,
488         .get_fd_flags   = aio_event_get_fd_flags,
489         .set_fd_flags   = aio_event_set_fd_flags,
490         .add_timed      = common_event_add_timed,
491         .loop_once      = aio_event_loop_once,
492         .loop_wait      = aio_event_loop_wait,
493 };
494
495 NTSTATUS events_aio_init(void)
496 {
497         return event_register_backend("aio", &aio_event_ops);
498 }