add the event system from ctdb
[sahlberg/remote-cache.git] / lib / events / events_aio.c
1 /* 
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - aio/epoll hybrid implementation
5
6    Copyright (C) Andrew Tridgell        2006
7
8    based on events_standard.c
9    
10    This program is free software; you can redistribute it and/or modify
11    it under the terms of the GNU General Public License as published by
12    the Free Software Foundation; either version 3 of the License, or
13    (at your option) any later version.
14    
15    This program is distributed in the hope that it will be useful,
16    but WITHOUT ANY WARRANTY; without even the implied warranty of
17    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18    GNU General Public License for more details.
19    
20    You should have received a copy of the GNU General Public License
21    along with this program.  If not, see <http://www.gnu.org/licenses/>.
22 */
23 /*
24   this is a very strange beast. The Linux AIO implementation doesn't
25   yet integrate properly with epoll, but there is a kernel patch that
26   allows the aio wait primitives to be used to wait for epoll events,
27   and this can be used to give us a unified event system incorporating
28   both aio events and epoll events
29
30   this is _very_ experimental code
31 */
32
33 #include "includes.h"
34 #include "system/filesys.h"
35 #include "system/network.h"
36 #include "lib/util/dlinklist.h"
37 #include "lib/events/events.h"
38 #include "lib/events/events_internal.h"
39 #include <sys/epoll.h>
40 #include <libaio.h>
41
42 #define MAX_AIO_QUEUE_DEPTH     100
43 #ifndef IOCB_CMD_EPOLL_WAIT
44 #define IOCB_CMD_EPOLL_WAIT     9
45 #endif
46
47 struct aio_event_context {
48         /* a pointer back to the generic event_context */
49         struct event_context *ev;
50
51         /* list of filedescriptor events */
52         struct fd_event *fd_events;
53
54         /* number of registered fd event handlers */
55         int num_fd_events;
56
57         uint32_t destruction_count;
58
59         io_context_t ioctx;
60
61         struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
62
63         struct iocb *epoll_iocb;
64
65         int epoll_fd;
66         int is_epoll_set;
67         pid_t pid;
68 };
69
70 struct aio_event {
71         struct event_context *event_ctx;
72         struct iocb iocb;
73         void *private_data;
74         event_aio_handler_t handler;
75 };
76
77 /*
78   map from EVENT_FD_* to EPOLLIN/EPOLLOUT
79 */
80 static uint32_t epoll_map_flags(uint16_t flags)
81 {
82         uint32_t ret = 0;
83         if (flags & EVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
84         if (flags & EVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
85         return ret;
86 }
87
88 /*
89  free the epoll fd
90 */
91 static int aio_ctx_destructor(struct aio_event_context *aio_ev)
92 {
93         io_queue_release(aio_ev->ioctx);
94         close(aio_ev->epoll_fd);
95         aio_ev->epoll_fd = -1;
96         return 0;
97 }
98
99 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde);
100
101 /*
102   reopen the epoll handle when our pid changes
103   see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an 
104   demonstration of why this is needed
105  */
106 static void epoll_check_reopen(struct aio_event_context *aio_ev)
107 {
108         struct fd_event *fde;
109
110         if (aio_ev->pid == getpid()) {
111                 return;
112         }
113
114         close(aio_ev->epoll_fd);
115         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
116         if (aio_ev->epoll_fd == -1) {
117                 DEBUG(0,("Failed to recreate epoll handle after fork\n"));
118                 return;
119         }
120         aio_ev->pid = getpid();
121         for (fde=aio_ev->fd_events;fde;fde=fde->next) {
122                 epoll_add_event(aio_ev, fde);
123         }
124 }
125
126 #define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT      (1<<0)
127 #define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR   (1<<1)
128 #define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR      (1<<2)
129
130 /*
131  add the epoll event to the given fd_event
132 */
133 static void epoll_add_event(struct aio_event_context *aio_ev, struct fd_event *fde)
134 {
135         struct epoll_event event;
136         if (aio_ev->epoll_fd == -1) return;
137
138         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
139
140         /* if we don't want events yet, don't add an aio_event */
141         if (fde->flags == 0) return;
142
143         ZERO_STRUCT(event);
144         event.events = epoll_map_flags(fde->flags);
145         event.data.ptr = fde;
146         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
147         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
148
149         /* only if we want to read we want to tell the event handler about errors */
150         if (fde->flags & EVENT_FD_READ) {
151                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
152         }
153 }
154
155 /*
156  delete the epoll event for given fd_event
157 */
158 static void epoll_del_event(struct aio_event_context *aio_ev, struct fd_event *fde)
159 {
160         struct epoll_event event;
161
162         DLIST_REMOVE(aio_ev->fd_events, fde);
163
164         if (aio_ev->epoll_fd == -1) return;
165
166         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
167
168         /* if there's no aio_event, we don't need to delete it */
169         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
170
171         ZERO_STRUCT(event);
172         event.events = epoll_map_flags(fde->flags);
173         event.data.ptr = fde;
174         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
175
176         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
177 }
178
179 /*
180  change the epoll event to the given fd_event
181 */
182 static void epoll_mod_event(struct aio_event_context *aio_ev, struct fd_event *fde)
183 {
184         struct epoll_event event;
185         if (aio_ev->epoll_fd == -1) return;
186
187         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
188
189         ZERO_STRUCT(event);
190         event.events = epoll_map_flags(fde->flags);
191         event.data.ptr = fde;
192         epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
193
194         /* only if we want to read we want to tell the event handler about errors */
195         if (fde->flags & EVENT_FD_READ) {
196                 fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
197         }
198 }
199
200 static void epoll_change_event(struct aio_event_context *aio_ev, struct fd_event *fde)
201 {
202         bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
203         bool want_read = (fde->flags & EVENT_FD_READ);
204         bool want_write= (fde->flags & EVENT_FD_WRITE);
205
206         if (aio_ev->epoll_fd == -1) return;
207
208         fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
209
210         /* there's already an event */
211         if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
212                 if (want_read || (want_write && !got_error)) {
213                         epoll_mod_event(aio_ev, fde);
214                         return;
215                 }
216                 epoll_del_event(aio_ev, fde);
217                 return;
218         }
219
220         /* there's no aio_event attached to the fde */
221         if (want_read || (want_write && !got_error)) {
222                 DLIST_ADD(aio_ev->fd_events, fde);
223                 epoll_add_event(aio_ev, fde);
224                 return;
225         }
226 }
227
228 static int setup_epoll_wait(struct aio_event_context *aio_ev)
229 {
230         if (aio_ev->is_epoll_set) {
231                 return 0;
232         }
233         memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
234         aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
235         aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
236         aio_ev->epoll_iocb->aio_reqprio = 0;
237
238         aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
239         aio_ev->epoll_iocb->u.c.offset = -1;
240         aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
241
242         if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
243                 return -1;
244         }
245         aio_ev->is_epoll_set = 1;
246
247         return 0;
248 }
249
250
251 /*
252   event loop handling using aio/epoll hybrid
253 */
254 static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
255 {
256         int ret, i;
257         uint32_t destruction_count = ++aio_ev->destruction_count;
258         struct timespec timeout;
259         struct io_event events[8];
260
261         if (aio_ev->epoll_fd == -1) return -1;
262
263         if (aio_ev->ev->num_signal_handlers && 
264             common_event_check_signal(aio_ev->ev)) {
265                 return 0;
266         }
267
268         if (tvalp) {
269                 timeout.tv_sec = tvalp->tv_sec;
270                 timeout.tv_nsec = tvalp->tv_usec;
271                 timeout.tv_nsec *= 1000;
272         }
273
274         if (setup_epoll_wait(aio_ev) < 0) 
275                 return -1;
276
277         ret = io_getevents(aio_ev->ioctx, 1, 8,
278                            events, tvalp?&timeout:NULL);
279
280         if (ret == -EINTR) {
281                 if (aio_ev->ev->num_signal_handlers) {
282                         common_event_check_signal(aio_ev->ev);
283                 }
284                 return 0;
285         }
286
287         if (ret == 0 && tvalp) {
288                 /* we don't care about a possible delay here */
289                 common_event_loop_timer_delay(aio_ev->ev);
290                 return 0;
291         }
292
293         for (i=0;i<ret;i++) {
294                 struct io_event *event = &events[i];
295                 struct iocb *finished = event->obj;
296
297                 switch (finished->aio_lio_opcode) {
298                 case IO_CMD_PWRITE:
299                 case IO_CMD_PREAD: {
300                         struct aio_event *ae = talloc_get_type(finished->data, 
301                                                                struct aio_event);
302                         if (ae) {
303                                 talloc_set_destructor(ae, NULL);
304                                 ae->handler(ae->event_ctx, ae, 
305                                             event->res, ae->private_data);
306                                 talloc_free(ae);
307                         }
308                         break;
309                 }
310                 case IOCB_CMD_EPOLL_WAIT: {
311                         struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
312                         struct fd_event *fde;
313                         uint16_t flags = 0;
314                         int j;
315
316                         aio_ev->is_epoll_set = 0;
317
318                         for (j=0; j<event->res; j++, ep++) {
319                                 fde = talloc_get_type(ep->data.ptr, 
320                                                       struct fd_event);
321                                 if (fde == NULL) {
322                                         return -1;
323                                 }
324                                 if (ep->events & (EPOLLHUP|EPOLLERR)) {
325                                         fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
326                                         if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
327                                                 epoll_del_event(aio_ev, fde);
328                                                 continue;
329                                         }
330                                         flags |= EVENT_FD_READ;
331                                 }
332                                 if (ep->events & EPOLLIN) flags |= EVENT_FD_READ;
333                                 if (ep->events & EPOLLOUT) flags |= EVENT_FD_WRITE;
334                                 if (flags) {
335                                         fde->handler(aio_ev->ev, fde, flags, fde->private_data);
336                                 }
337                         }
338                         break;
339                 }
340                 }
341                 if (destruction_count != aio_ev->destruction_count) {
342                         return 0;
343                 }
344         }
345
346         return 0;
347 }
348
349 /*
350   create a aio_event_context structure.
351 */
352 static int aio_event_context_init(struct event_context *ev)
353 {
354         struct aio_event_context *aio_ev;
355         
356         aio_ev = talloc_zero(ev, struct aio_event_context);
357         if (!aio_ev) return -1;
358
359         aio_ev->ev = ev;
360         aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
361
362         if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
363                 talloc_free(aio_ev);
364                 return -1;
365         }
366
367         aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
368         if (aio_ev->epoll_fd == -1) {
369                 talloc_free(aio_ev);
370                 return -1;
371         }
372         aio_ev->pid = getpid();
373
374         talloc_set_destructor(aio_ev, aio_ctx_destructor);
375
376         ev->additional_data = aio_ev;
377
378         if (setup_epoll_wait(aio_ev) < 0) {
379                 talloc_free(aio_ev);
380                 return -1;
381         }
382
383         return 0;
384 }
385
386 /*
387   destroy an fd_event
388 */
389 static int aio_event_fd_destructor(struct fd_event *fde)
390 {
391         struct event_context *ev = fde->event_ctx;
392         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
393                                                            struct aio_event_context);
394
395         epoll_check_reopen(aio_ev);
396
397         aio_ev->num_fd_events--;
398         aio_ev->destruction_count++;
399
400         epoll_del_event(aio_ev, fde);
401
402         if (fde->flags & EVENT_FD_AUTOCLOSE) {
403                 close(fde->fd);
404                 fde->fd = -1;
405         }
406
407         return 0;
408 }
409
410 /*
411   add a fd based event
412   return NULL on failure (memory allocation error)
413 */
414 static struct fd_event *aio_event_add_fd(struct event_context *ev, TALLOC_CTX *mem_ctx,
415                                          int fd, uint16_t flags,
416                                          event_fd_handler_t handler,
417                                          void *private_data)
418 {
419         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
420                                                            struct aio_event_context);
421         struct fd_event *fde;
422
423         epoll_check_reopen(aio_ev);
424
425         fde = talloc(mem_ctx?mem_ctx:ev, struct fd_event);
426         if (!fde) return NULL;
427
428         fde->event_ctx          = ev;
429         fde->fd                 = fd;
430         fde->flags              = flags;
431         fde->handler            = handler;
432         fde->private_data       = private_data;
433         fde->additional_flags   = 0;
434         fde->additional_data    = NULL;
435
436         aio_ev->num_fd_events++;
437         talloc_set_destructor(fde, aio_event_fd_destructor);
438
439         DLIST_ADD(aio_ev->fd_events, fde);
440         epoll_add_event(aio_ev, fde);
441
442         return fde;
443 }
444
445
446 /*
447   return the fd event flags
448 */
449 static uint16_t aio_event_get_fd_flags(struct fd_event *fde)
450 {
451         return fde->flags;
452 }
453
454 /*
455   set the fd event flags
456 */
457 static void aio_event_set_fd_flags(struct fd_event *fde, uint16_t flags)
458 {
459         struct event_context *ev;
460         struct aio_event_context *aio_ev;
461
462         if (fde->flags == flags) return;
463
464         ev = fde->event_ctx;
465         aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
466
467         fde->flags = flags;
468
469         epoll_check_reopen(aio_ev);
470
471         epoll_change_event(aio_ev, fde);
472 }
473
474 /*
475   do a single event loop using the events defined in ev 
476 */
477 static int aio_event_loop_once(struct event_context *ev)
478 {
479         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
480                                                            struct aio_event_context);
481         struct timeval tval;
482
483         tval = common_event_loop_timer_delay(ev);
484         if (timeval_is_zero(&tval)) {
485                 return 0;
486         }
487
488         epoll_check_reopen(aio_ev);
489
490         return aio_event_loop(aio_ev, &tval);
491 }
492
493 /*
494   return on failure or (with 0) if all fd events are removed
495 */
496 static int aio_event_loop_wait(struct event_context *ev)
497 {
498         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
499                                                            struct aio_event_context);
500         while (aio_ev->num_fd_events) {
501                 if (aio_event_loop_once(ev) != 0) {
502                         break;
503                 }
504         }
505
506         return 0;
507 }
508
509 /*
510   called when a disk IO event needs to be cancelled
511 */
512 static int aio_destructor(struct aio_event *ae)
513 {
514         struct event_context *ev = ae->event_ctx;
515         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
516                                                            struct aio_event_context);
517         struct io_event result;
518         io_cancel(aio_ev->ioctx, &ae->iocb, &result);
519         /* TODO: handle errors from io_cancel()! */
520         return 0;
521 }
522
523 /* submit an aio disk IO event */
524 static struct aio_event *aio_event_add_aio(struct event_context *ev, 
525                                            TALLOC_CTX *mem_ctx,
526                                            struct iocb *iocb,
527                                            event_aio_handler_t handler,
528                                            void *private_data)
529 {
530         struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
531                                                            struct aio_event_context);
532         struct iocb *iocbp;
533         struct aio_event *ae = talloc(mem_ctx?mem_ctx:ev, struct aio_event);
534         if (ae == NULL) return NULL;
535
536         ae->event_ctx    = ev;
537         ae->iocb         = *iocb;
538         ae->handler      = handler;
539         ae->private_data = private_data;
540         iocbp = &ae->iocb;
541
542         if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
543                 talloc_free(ae);
544                 return NULL;
545         }
546         ae->iocb.data = ae;
547         talloc_set_destructor(ae, aio_destructor);
548
549         return ae;
550 }
551
552 static const struct event_ops aio_event_ops = {
553         .context_init   = aio_event_context_init,
554         .add_fd         = aio_event_add_fd,
555         .add_aio        = aio_event_add_aio,
556         .get_fd_flags   = aio_event_get_fd_flags,
557         .set_fd_flags   = aio_event_set_fd_flags,
558         .add_timed      = common_event_add_timed,
559         .add_signal     = common_event_add_signal,
560         .loop_once      = aio_event_loop_once,
561         .loop_wait      = aio_event_loop_wait,
562 };
563
564 bool events_aio_init(void)
565 {
566         return event_register_backend("aio", &aio_event_ops);
567 }
568
569 #if _SAMBA_BUILD_
570 NTSTATUS s4_events_aio_init(void)
571 {
572         if (!events_aio_init()) {
573                 return NT_STATUS_INTERNAL_ERROR;
574         }
575         return NT_STATUS_OK;
576 }
577 #endif