TODO/UNTESTED: tevent: add kqueue backend
[metze/samba/wip.git] / lib / tevent / tevent_kqueue.c
1 /*
2    Unix SMB/CIFS implementation.
3
4    main select loop and event handling - kqueue implementation
5
6    Copyright (C) Stefan Metzmacher 2013
7
8      ** NOTE! The following LGPL license applies to the tevent
9      ** library. This does NOT imply that all of Samba is released
10      ** under the LGPL
11
12    This library is free software; you can redistribute it and/or
13    modify it under the terms of the GNU Lesser General Public
14    License as published by the Free Software Foundation; either
15    version 3 of the License, or (at your option) any later version.
16
17    This library is distributed in the hope that it will be useful,
18    but WITHOUT ANY WARRANTY; without even the implied warranty of
19    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
20    Lesser General Public License for more details.
21
22    You should have received a copy of the GNU Lesser General Public
23    License along with this library; if not, see <http://www.gnu.org/licenses/>.
24 */
25
26 #include "replace.h"
27 #include "system/filesys.h"
28 #include "system/time.h"
29 #ifdef HAVE_SYS_EVENT_H
30 #include <sys/event.h>
31 #endif
32 #include "tevent.h"
33 #include "tevent_internal.h"
34 #include "tevent_util.h"
35
36 struct kqueue_event_context {
37         /* a pointer back to the generic event_context */
38         struct tevent_context *ev;
39
40         pid_t pid;
41         int kqueue_fd;
42
43         bool panic_force_replay;
44         bool *panic_state;
45         bool (*panic_fallback)(struct tevent_context *ev, bool replay);
46 };
47
48 #define KQUEUE_ADDITIONAL_FD_FLAG_HAS_READ              (1<<0)
49 #define KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_READ         (1<<1)
50 #define KQUEUE_ADDITIONAL_FD_FLAG_HAS_WRITE             (1<<2)
51 #define KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_WRITE        (1<<3)
52 #define KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR          (1<<4)
53 #define KQUEUE_ADDITIONAL_FD_FLAG_GOT_ERROR             (1<<5)
54
55 /*
56   called to set the panic fallback function.
57 */
58 _PRIVATE_ bool tevent_kqueue_set_panic_fallback(struct tevent_context *ev,
59                                 bool (*panic_fallback)(struct tevent_context *ev,
60                                                        bool replay))
61 {
62         struct kqueue_event_context *kqueue_ev;
63
64         if (ev->additional_data == NULL) {
65                 return false;
66         }
67
68         kqueue_ev = talloc_get_type(ev->additional_data,
69                                     struct kqueue_event_context);
70         if (kqueue_ev == NULL) {
71                 return false;
72         }
73         kqueue_ev->panic_fallback = panic_fallback;
74         return true;
75 }
76
77 /*
78   called when a epoll call fails
79 */
80 static void kqueue_panic(struct kqueue_event_context *kqueue_ev,
81                          const char *reason, bool replay)
82 {
83         struct tevent_context *ev = kqueue_ev->ev;
84         bool (*panic_fallback)(struct tevent_context *ev, bool replay);
85
86         panic_fallback = kqueue_ev->panic_fallback;
87
88         if (kqueue_ev->panic_state != NULL) {
89                 *kqueue_ev->panic_state = true;
90         }
91
92         if (kqueue_ev->panic_force_replay) {
93                 replay = true;
94         }
95
96         TALLOC_FREE(ev->additional_data);
97
98         if (panic_fallback == NULL) {
99                 tevent_debug(ev, TEVENT_DEBUG_FATAL,
100                         "%s (%s) replay[%u] - calling abort()\n",
101                         reason, strerror(errno), (unsigned)replay);
102                 abort();
103         }
104
105         tevent_debug(ev, TEVENT_DEBUG_ERROR,
106                      "%s (%s) replay[%u] - calling panic_fallback\n",
107                      reason, strerror(errno), (unsigned)replay);
108
109         if (!panic_fallback(ev, replay)) {
110                 /* Fallback failed. */
111                 tevent_debug(ev, TEVENT_DEBUG_FATAL,
112                         "%s (%s) replay[%u] - calling abort()\n",
113                         reason, strerror(errno), (unsigned)replay);
114                 abort();
115         }
116 }
117
118 static int kqueue_ctx_destructor(struct kqueue_event_context *kqueue_ev)
119 {
120         kqueue_ev->ev->additional_data = NULL;
121
122         if (kqueue_ev->kqueue_fd != -1) {
123                 close(kqueue_ev->kqueue_fd);
124         }
125         kqueue_ev->kqueue_fd = -1;
126
127         return 0;
128 }
129
130 static int kqueue_event_context_init(struct tevent_context *ev)
131 {
132         struct kqueue_event_context *kqueue_ev;
133
134         /*
135          * We might be called during tevent_re_initialise()
136          * which means we need to free our old additional_data.
137          */
138         TALLOC_FREE(ev->additional_data);
139
140         kqueue_ev = talloc_zero(ev, struct kqueue_event_context);
141         if (kqueue_ev == NULL) {
142                 return -1;
143         }
144         kqueue_ev->ev = ev;
145         kqueue_ev->pid = getpid();
146         kqueue_ev->kqueue_fd = -1;
147
148         talloc_set_destructor(kqueue_ev, kqueue_ctx_destructor);
149
150         kqueue_ev->kqueue_fd = kqueue();
151         if (kqueue_ev->kqueue_fd == -1) {
152                 TALLOC_FREE(kqueue_ev);
153                 tevent_debug(ev, TEVENT_DEBUG_FATAL,
154                              "Failed to create kqueue() handle.\n");
155                 return -1;
156         }
157
158         if (!ev_set_close_on_exec(kqueue_ev->kqueue_fd)) {
159                 tevent_debug(kqueue_ev->ev, TEVENT_DEBUG_WARNING,
160                              "kqueue_fd pid[%d] - failed to set close-on-exec, "
161                              "file descriptor may be leaked to children.\n",
162                              kqueue_ev->pid);
163         }
164
165         ev->additional_data = kqueue_ev;
166         return 0;
167 }
168
169 static void kqueue_update_fd_event(struct kqueue_event_context *kqueue_ev,
170                                    struct tevent_fd *fde);
171
172 static void kqueue_check_reopen(struct kqueue_event_context *kqueue_ev)
173 {
174         struct tevent_fd *fde;
175         bool *caller_panic_state = kqueue_ev->panic_state;
176         bool panic_triggered = false;
177
178         if (kqueue_ev->pid == getpid()) {
179                 return;
180         }
181         kqueue_ev->pid = getpid();
182
183         close(kqueue_ev->kqueue_fd);
184         kqueue_ev->kqueue_fd = kqueue();
185         if (kqueue_ev->kqueue_fd == -1) {
186                 kqueue_panic(kqueue_ev, "kqueue() failed", false);
187                 return;
188         }
189
190         if (!ev_set_close_on_exec(kqueue_ev->kqueue_fd)) {
191                 tevent_debug(kqueue_ev->ev, TEVENT_DEBUG_WARNING,
192                              "kqueue_fd pid[%d] - failed to set close-on-exec, "
193                              "file descriptor may be leaked to children.\n",
194                              kqueue_ev->pid);
195         }
196
197         kqueue_ev->panic_state = &panic_triggered;
198         for (fde=kqueue_ev->ev->fd_events;fde;fde=fde->next) {
199                 fde->additional_flags &= KQUEUE_ADDITIONAL_FD_FLAG_GOT_ERROR;
200                 kqueue_update_fd_event(kqueue_ev, fde);
201
202                 if (panic_triggered) {
203                         if (caller_panic_state != NULL) {
204                                 *caller_panic_state = true;
205                         }
206                         return;
207                 }
208         }
209         kqueue_ev->panic_state = NULL;
210 }
211
212 static void kqueue_update_fd_event(struct kqueue_event_context *kqueue_ev,
213                                    struct tevent_fd *fde)
214 {
215         bool add_read = false;
216         bool enable_read = false;
217         bool disable_read = false;
218         bool delete_read = false;
219         bool add_write = false;
220         bool enable_write = false;
221         bool disable_write = false;
222         bool delete_write = false;
223         bool got_error = (fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_GOT_ERROR);
224         struct kevent kev;
225         int ret;
226
227         if (fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_WRITE) {
228                 if (got_error) {
229                         delete_write = true;
230                 } else if (fde->flags & TEVENT_FD_WRITE) {
231                         enable_write = true;
232                 } else if (fde->flags == 0) {
233                         delete_write = true;
234                 }
235         } else if (fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_HAS_WRITE) {
236                 if (got_error) {
237                         delete_write = true;
238                 } else if (fde->flags == 0) {
239                         delete_write = true;
240                 } else if (!(fde->flags & TEVENT_FD_WRITE)) {
241                         disable_write = true;
242                 }
243         } else {
244                 if (got_error) {
245                         /* nothing */
246                 } else if (fde->flags & TEVENT_FD_WRITE) {
247                         add_write = true;
248                 }
249         }
250
251         if (fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_READ) {
252                 if (fde->flags & TEVENT_FD_READ) {
253                         enable_read = true;
254                 } else if (fde->flags == 0) {
255                         delete_read = true;
256                 }
257         } else if (fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_HAS_READ) {
258                 if (fde->flags == 0) {
259                         delete_read = true;
260                 } else if (!(fde->flags & TEVENT_FD_READ)) {
261                         disable_read = true;
262                 }
263         } else {
264                 if (fde->flags & TEVENT_FD_READ) {
265                         add_read = true;
266                 }
267         }
268
269         if (add_write) {
270                 EV_SET(&kev, fde->fd, EVFILT_WRITE, EV_ADD,
271                        0 /* fflags */, NULL /* data */,
272                        fde);
273                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
274                 if (ret != 0) {
275                         kqueue_panic(kqueue_ev, "EVFILT_WRITE EV_ADD failed",
276                                      false);
277                         return;
278                 }
279
280                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_HAS_WRITE;
281         } else if (enable_write) {
282                 EV_SET(&kev, fde->fd, EVFILT_WRITE, EV_ENABLE,
283                        0 /* fflags */, NULL /* data */,
284                        fde);
285                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
286                 if (ret != 0) {
287                         kqueue_panic(kqueue_ev, "EVFILT_WRITE EV_ENABLE failed",
288                                      false);
289                         return;
290                 }
291
292                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_WRITE;
293         } else if (disable_write) {
294                 EV_SET(&kev, fde->fd, EVFILT_WRITE, EV_DISABLE,
295                        0 /* fflags */, NULL /* data */,
296                        fde);
297                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
298                 if (ret != 0) {
299                         kqueue_panic(kqueue_ev, "EVFILT_WRITE EV_DISABLE failed",
300                                      false);
301                         return;
302                 }
303
304                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_WRITE;
305         } else if (delete_write) {
306                 EV_SET(&kev, fde->fd, EVFILT_WRITE, EV_DELETE,
307                        0 /* fflags */, NULL /* data */,
308                        fde);
309                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
310                 if (ret != 0) {
311                         kqueue_panic(kqueue_ev, "EVFILT_WRITE EV_DEL failed",
312                                      false);
313                         return;
314                 }
315
316                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_HAS_WRITE;
317                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_WRITE;
318         }
319
320         if (add_read) {
321                 EV_SET(&kev, fde->fd, EVFILT_READ, EV_ADD,
322                        0 /* fflags */, NULL /* data */,
323                        fde);
324                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
325                 if (ret != 0) {
326                         kqueue_panic(kqueue_ev, "EVFILT_READ EV_ADD failed",
327                                      false);
328                         return;
329                 }
330
331                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_HAS_READ;
332                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR;
333         } else if (enable_read) {
334                 EV_SET(&kev, fde->fd, EVFILT_READ, EV_ENABLE,
335                        0 /* fflags */, NULL /* data */,
336                        fde);
337                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
338                 if (ret != 0) {
339                         kqueue_panic(kqueue_ev, "EVFILT_READ EV_ENABLE failed",
340                                      false);
341                         return;
342                 }
343
344                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_READ;
345                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR;
346         } else if (disable_read) {
347                 EV_SET(&kev, fde->fd, EVFILT_READ, EV_DISABLE,
348                        0 /* fflags */, NULL /* data */,
349                        fde);
350                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
351                 if (ret != 0) {
352                         kqueue_panic(kqueue_ev, "EVFILT_READ EV_DISABLE failed",
353                                      false);
354                         return;
355                 }
356
357                 fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_READ;
358                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR;
359         } else if (delete_read) {
360                 EV_SET(&kev, fde->fd, EVFILT_READ, EV_DELETE,
361                        0 /* fflags */, NULL /* data */,
362                        fde);
363                 ret = kevent(kqueue_ev->kqueue_fd, &kev, 1, NULL, 0, NULL);
364                 if (ret != 0) {
365                         kqueue_panic(kqueue_ev, "EVFILT_READ EV_DEL failed",
366                                      false);
367                         return;
368                 }
369
370                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_HAS_READ;
371                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_DISABLED_READ;
372                 fde->additional_flags &= ~KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR;
373         }
374 }
375
376 /*
377   destroy an fd_event
378 */
379 static int kqueue_event_fd_destructor(struct tevent_fd *fde)
380 {
381         struct tevent_context *ev = fde->event_ctx;
382         struct kqueue_event_context *kqueue_ev = NULL;
383         bool panic_triggered = false;
384         int flags = fde->flags;
385
386         if (ev == NULL) {
387                 return tevent_common_fd_destructor(fde);
388         }
389
390         kqueue_ev = talloc_get_type_abort(ev->additional_data,
391                                          struct kqueue_event_context);
392
393         /*
394          * we must remove the event from the list
395          * otherwise a panic fallback handler may
396          * reuse invalid memory
397          */
398         DLIST_REMOVE(ev->fd_events, fde);
399
400         kqueue_ev->panic_state = &panic_triggered;
401         kqueue_check_reopen(kqueue_ev);
402         if (panic_triggered) {
403                 return tevent_common_fd_destructor(fde);
404         }
405
406         fde->flags = 0;
407         kqueue_update_fd_event(kqueue_ev, fde);
408         fde->flags = flags;
409         if (panic_triggered) {
410                 return tevent_common_fd_destructor(fde);
411         }
412         kqueue_ev->panic_state = NULL;
413
414         return tevent_common_fd_destructor(fde);
415 }
416
417 static struct tevent_fd *kqueue_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
418                                             int fd, uint16_t flags,
419                                             tevent_fd_handler_t handler,
420                                             void *private_data,
421                                             const char *handler_name,
422                                             const char *location)
423 {
424         struct kqueue_event_context *kqueue_ev =
425                 talloc_get_type_abort(ev->additional_data,
426                 struct kqueue_event_context);
427         struct tevent_fd *fde;
428         bool panic_triggered = false;
429
430         fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
431                                    handler, private_data,
432                                    handler_name, location);
433         if (fde == NULL) {
434                 return NULL;
435         }
436         talloc_set_destructor(fde, kqueue_event_fd_destructor);
437
438         kqueue_ev->panic_state = &panic_triggered;
439         kqueue_check_reopen(kqueue_ev);
440         if (panic_triggered) {
441                 return fde;
442         }
443         kqueue_ev->panic_state = NULL;
444
445         kqueue_update_fd_event(kqueue_ev, fde);
446
447         return fde;
448 }
449
450 /*
451   set the fd event flags
452 */
453 static void kqueue_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
454 {
455         struct tevent_context *ev;
456         struct kqueue_event_context *kqueue_ev;
457         bool panic_triggered = false;
458
459         if (fde->flags == flags) return;
460
461         ev = fde->event_ctx;
462         kqueue_ev = talloc_get_type_abort(ev->additional_data,
463                                           struct kqueue_event_context);
464
465         fde->flags = flags;
466
467         kqueue_ev->panic_state = &panic_triggered;
468         kqueue_check_reopen(kqueue_ev);
469         if (panic_triggered) {
470                 return;
471         }
472         kqueue_ev->panic_state = NULL;
473
474         kqueue_update_fd_event(kqueue_ev, fde);
475 }
476
477 /*
478   event loop handling using kqueue
479 */
480 static int kqueue_event_loop(struct kqueue_event_context *kqueue_ev, struct timeval *tvalp)
481 {
482         int ret, i;
483 #define MAXEVENTS 1
484         struct kevent kevs[MAXEVENTS];
485         struct timespec _ts;
486         const struct timespec *timeout = NULL;
487         int kevent_errno;
488
489         if (kqueue_ev->ev->signal_events &&
490             tevent_common_check_signal(kqueue_ev->ev)) {
491                 return 0;
492         }
493
494         if (tvalp) {
495                 _ts.tv_sec = tvalp->tv_sec;
496                 _ts.tv_nsec = tvalp->tv_usec * 1000;
497                 timeout = &_ts;
498         }
499
500         tevent_trace_point_callback(kqueue_ev->ev, TEVENT_TRACE_BEFORE_WAIT);
501         ret = kevent(kqueue_ev->kqueue_fd, NULL, 0, kevs, MAXEVENTS, timeout);
502         kevent_errno = errno;
503         tevent_trace_point_callback(kqueue_ev->ev, TEVENT_TRACE_AFTER_WAIT);
504
505         if (ret == -1 && kevent_errno == EINTR && kqueue_ev->ev->signal_events) {
506                 if (tevent_common_check_signal(kqueue_ev->ev)) {
507                         return 0;
508                 }
509         }
510
511         if (ret == -1 && kevent_errno != EINTR) {
512                 kqueue_panic(kqueue_ev, "kevent() failed", true);
513                 return -1;
514         }
515
516         if (ret == 0 && tvalp) {
517                 /* we don't care about a possible delay here */
518                 tevent_common_loop_timer_delay(kqueue_ev->ev);
519                 return 0;
520         }
521
522         for (i=0;i<ret;i++) {
523                 struct tevent_fd *fde = talloc_get_type(kevs[i].udata,
524                                                         struct tevent_fd);
525                 uint16_t flags = 0;
526
527                 if (fde == NULL) {
528                         kqueue_panic(kqueue_ev, "kevent() gave bad data",
529                                      true);
530                         return -1;
531                 }
532
533                 if (kevs[i].flags & EV_EOF) {
534                         fde->additional_flags |= KQUEUE_ADDITIONAL_FD_FLAG_GOT_ERROR;
535                         /*
536                          * if we only wait for TEVENT_FD_WRITE, we should not
537                          * tell the event handler about it, and remove the
538                          * EVFILT_WRITE filter, as we only report errors when
539                          * waiting for read events, to match the select()
540                          * behavior
541                          */
542                         if (!(fde->additional_flags & KQUEUE_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
543                                 kqueue_update_fd_event(kqueue_ev, fde);
544                                 continue;
545                         }
546                         flags |= TEVENT_FD_READ;
547                 }
548                 if (kevs[i].filter == EVFILT_READ) {
549                         flags |= TEVENT_FD_READ;
550                 }
551                 if (kevs[i].filter == EVFILT_WRITE) {
552                         flags |= TEVENT_FD_WRITE;
553                 }
554
555                 /*
556                  * make sure we only pass the flags
557                  * the handler is expecting.
558                  */
559                 flags &= fde->flags;
560                 if (flags) {
561                         fde->handler(kqueue_ev->ev, fde, flags, fde->private_data);
562                         break;
563                 }
564         }
565
566         return 0;
567 }
568
569 /*
570   do a single event loop using the events defined in ev
571 */
572 static int kqueue_event_loop_once(struct tevent_context *ev, const char *location)
573 {
574         struct kqueue_event_context *kqueue_ev = talloc_get_type(ev->additional_data,
575                                                            struct kqueue_event_context);
576         struct timeval tval;
577         bool panic_triggered = false;
578
579         if (ev->signal_events &&
580             tevent_common_check_signal(ev)) {
581                 return 0;
582         }
583
584         if (ev->immediate_events &&
585             tevent_common_loop_immediate(ev)) {
586                 return 0;
587         }
588
589         tval = tevent_common_loop_timer_delay(ev);
590         if (tevent_timeval_is_zero(&tval)) {
591                 return 0;
592         }
593
594         kqueue_ev->panic_state = &panic_triggered;
595         kqueue_ev->panic_force_replay = true;
596         kqueue_check_reopen(kqueue_ev);
597         if (panic_triggered) {
598                 errno = EINVAL;
599                 return -1;
600         }
601         kqueue_ev->panic_force_replay = false;
602         kqueue_ev->panic_state = NULL;
603
604         return kqueue_event_loop(kqueue_ev, &tval);
605 }
606
607 static const struct tevent_ops kqueue_event_ops = {
608         .context_init           = kqueue_event_context_init,
609         .add_fd                 = kqueue_event_add_fd,
610         .set_fd_close_fn        = tevent_common_fd_set_close_fn,
611         .get_fd_flags           = tevent_common_fd_get_flags,
612         .set_fd_flags           = kqueue_event_set_fd_flags,
613         .add_timer              = tevent_common_add_timer_v2,
614         .schedule_immediate     = tevent_common_schedule_immediate,
615         .add_signal             = tevent_common_add_signal,
616         .loop_once              = kqueue_event_loop_once,
617         .loop_wait              = tevent_common_loop_wait,
618 };
619
620 _PRIVATE_ bool tevent_kqueue_init(void)
621 {
622         return tevent_register_backend("kqueue", &kqueue_event_ops);
623 }