--- /dev/null
+/*
+ * eventfd-aio-test by Davide Libenzi (test app for eventfd hooked into KAIO)
+ * Copyright (C) 2007 Davide Libenzi
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Davide Libenzi <davidel@xmailserver.org>
+ *
+ */
+
+#define _GNU_SOURCE
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <sys/signal.h>
+#include <sys/time.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+#include <errno.h>
+
+
+/*
+ * This were good at the time of 2.6.21-rc5.mm4 ...
+ */
+#ifndef __NR_eventfd
+#if defined(__x86_64__)
+#define __NR_eventfd 283
+#elif defined(__i386__)
+#define __NR_eventfd 323
+#else
+#error Cannot detect your architecture!
+#endif
+#endif
+
+
+
+#define TESTFILE_SIZE (4096 * 5120)
+#define IORTX_SIZE (1024 * 4)
+#define NUM_EVENTS 128
+
+
+
+typedef unsigned long aio_context_t;
+
+enum {
+ IOCB_CMD_PREAD = 0,
+ IOCB_CMD_PWRITE = 1,
+ IOCB_CMD_FSYNC = 2,
+ IOCB_CMD_FDSYNC = 3,
+ /* These two are experimental.
+ * IOCB_CMD_PREADX = 4,
+ * IOCB_CMD_POLL = 5,
+ */
+ IOCB_CMD_NOOP = 6,
+ IOCB_CMD_PREADV = 7,
+ IOCB_CMD_PWRITEV = 8,
+};
+
+#if defined(__LITTLE_ENDIAN)
+#define PADDED(x,y) x, y
+#elif defined(__BIG_ENDIAN)
+#define PADDED(x,y) y, x
+#else
+#error edit for your odd byteorder.
+#endif
+
+#define IOCB_FLAG_RESFD (1 << 0)
+
+/*
+ * we always use a 64bit off_t when communicating
+ * with userland. its up to libraries to do the
+ * proper padding and aio_error abstraction
+ */
+struct iocb {
+ /* these are internal to the kernel/libc. */
+ u_int64_t aio_data; /* data to be returned in event's data */
+ u_int32_t PADDED(aio_key, aio_reserved1);
+ /* the kernel sets aio_key to the req # */
+
+ /* common fields */
+ u_int16_t aio_lio_opcode; /* see IOCB_CMD_ above */
+ int16_t aio_reqprio;
+ u_int32_t aio_fildes;
+
+ u_int64_t aio_buf;
+ u_int64_t aio_nbytes;
+ int64_t aio_offset;
+
+ /* extra parameters */
+ u_int64_t aio_reserved2; /* TODO: use this for a (struct sigevent *) */
+
+ u_int32_t aio_flags;
+ /*
+ * If different from 0, this is an eventfd to deliver AIO results to
+ */
+ u_int32_t aio_resfd;
+}; /* 64 bytes */
+
+struct io_event {
+ u_int64_t data; /* the data field from the iocb */
+ u_int64_t obj; /* what iocb this event came from */
+ int64_t res; /* result code for this event */
+ int64_t res2; /* secondary result */
+};
+
+
+
+
+
+void asyio_prep_preadv(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PREADV;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) iov;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pwritev(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PWRITEV;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) iov;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pread(struct iocb *iocb, int fd, void *buf, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PREAD;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) buf;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pwrite(struct iocb *iocb, int fd, void const *buf, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PWRITE;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) buf;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+long io_setup(unsigned nr_reqs, aio_context_t *ctx) {
+
+ return syscall(__NR_io_setup, nr_reqs, ctx);
+}
+
+long io_destroy(aio_context_t ctx) {
+
+ return syscall(__NR_io_destroy, ctx);
+}
+
+long io_submit(aio_context_t ctx, long n, struct iocb **paiocb) {
+
+ return syscall(__NR_io_submit, ctx, n, paiocb);
+}
+
+long io_cancel(aio_context_t ctx, struct iocb *aiocb, struct io_event *res) {
+
+ return syscall(__NR_io_cancel, ctx, aiocb, res);
+}
+
+long io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events,
+ struct timespec *tmo) {
+
+ return syscall(__NR_io_getevents, ctx, min_nr, nr, events, tmo);
+}
+
+int eventfd(int count) {
+
+ return syscall(__NR_eventfd, count);
+}
+
+long waitasync(int afd, int timeo) {
+ struct pollfd pfd;
+
+ pfd.fd = afd;
+ pfd.events = POLLIN;
+ pfd.revents = 0;
+ if (poll(&pfd, 1, timeo) < 0) {
+ perror("poll");
+ return -1;
+ }
+ if ((pfd.revents & POLLIN) == 0) {
+ fprintf(stderr, "no results completed\n");
+ return 0;
+ }
+
+ return 1;
+}
+
+long test_read(aio_context_t ctx, int fd, long range, int afd) {
+ long i, n, r, j;
+ u_int64_t eval;
+ struct iocb **piocb;
+ struct iocb *iocb;
+ struct timespec tmo;
+ static struct io_event events[NUM_EVENTS];
+ static char buf[IORTX_SIZE];
+
+ n = range / IORTX_SIZE;
+ iocb = malloc(n * sizeof(struct iocb));
+ piocb = malloc(n * sizeof(struct iocb *));
+ if (!iocb || !piocb) {
+ perror("iocb alloc");
+ return -1;
+ }
+ for (i = 0; i < n; i++) {
+ piocb[i] = &iocb[i];
+ asyio_prep_pread(&iocb[i], fd, buf, sizeof(buf),
+ (n - i - 1) * IORTX_SIZE, afd);
+ iocb[i].aio_data = (u_int64_t) i + 1;
+ }
+ fprintf(stdout, "submitting read request ...\n");
+ if (io_submit(ctx, n, piocb) <= 0) {
+ perror("io_submit");
+ return -1;
+ }
+ for (i = 0; i < n;) {
+ fprintf(stdout, "waiting ... ");
+ waitasync(afd, -1);
+ eval = 0;
+ if (read(afd, &eval, sizeof(eval)) != sizeof(eval))
+ perror("read");
+ fprintf(stdout, "done! %llu\n", (unsigned long long) eval);
+ while (eval > 0) {
+ tmo.tv_sec = 0;
+ tmo.tv_nsec = 0;
+ r = io_getevents(ctx, 1, eval > NUM_EVENTS ? NUM_EVENTS: (long) eval,
+ events, &tmo);
+ if (r > 0) {
+ for (j = 0; j < r; j++) {
+
+ }
+ i += r;
+ eval -= r;
+ fprintf(stdout, "test_write got %ld/%ld results so far\n",
+ i, n);
+ }
+ }
+ }
+ free(iocb);
+ free(piocb);
+
+ return n;
+}
+
+long test_write(aio_context_t ctx, int fd, long range, int afd) {
+ long i, n, r, j;
+ u_int64_t eval;
+ struct iocb **piocb;
+ struct iocb *iocb;
+ struct timespec tmo;
+ static struct io_event events[NUM_EVENTS];
+ static char buf[IORTX_SIZE];
+
+ for (i = 0; i < IORTX_SIZE; i++)
+ buf[i] = i & 0xff;
+ n = range / IORTX_SIZE;
+ iocb = malloc(n * sizeof(struct iocb));
+ piocb = malloc(n * sizeof(struct iocb *));
+ if (!iocb || !piocb) {
+ perror("iocb alloc");
+ return -1;
+ }
+ for (i = 0; i < n; i++) {
+ piocb[i] = &iocb[i];
+ asyio_prep_pwrite(&iocb[i], fd, buf, sizeof(buf),
+ (n - i - 1) * IORTX_SIZE, afd);
+ iocb[i].aio_data = (u_int64_t) i + 1;
+ }
+ fprintf(stdout, "submitting write request ...\n");
+ if (io_submit(ctx, n, piocb) <= 0) {
+ perror("io_submit");
+ return -1;
+ }
+ for (i = 0; i < n;) {
+ fprintf(stdout, "waiting ... ");
+ waitasync(afd, -1);
+ eval = 0;
+ if (read(afd, &eval, sizeof(eval)) != sizeof(eval))
+ perror("read");
+ fprintf(stdout, "done! %llu\n", (unsigned long long) eval);
+ while (eval > 0) {
+ tmo.tv_sec = 0;
+ tmo.tv_nsec = 0;
+ r = io_getevents(ctx, 1, eval > NUM_EVENTS ? NUM_EVENTS: (long) eval,
+ events, &tmo);
+ if (r > 0) {
+ for (j = 0; j < r; j++) {
+
+ }
+ i += r;
+ eval -= r;
+ fprintf(stdout, "test_write got %ld/%ld results so far\n",
+ i, n);
+ }
+ }
+ }
+ free(iocb);
+ free(piocb);
+
+ return n;
+}
+
+int main(int ac, char **av) {
+ int afd, fd;
+ aio_context_t ctx = 0;
+ char const *testfn = "/tmp/eventfd-aio-test.data";
+
+ fprintf(stdout, "creating an eventfd ...\n");
+ if ((afd = eventfd(0)) == -1) {
+ perror("eventfd");
+ return 2;
+ }
+ fprintf(stdout, "done! eventfd = %d\n", afd);
+ if (io_setup(TESTFILE_SIZE / IORTX_SIZE + 256, &ctx)) {
+ perror("io_setup");
+ return 3;
+ }
+ if ((fd = open(testfn, O_RDWR | O_CREAT, 0644)) == -1) {
+ perror(testfn);
+ return 4;
+ }
+ ftruncate(fd, TESTFILE_SIZE);
+
+ fcntl(afd, F_SETFL, fcntl(afd, F_GETFL, 0) | O_NONBLOCK);
+
+ test_write(ctx, fd, TESTFILE_SIZE, afd);
+ test_read(ctx, fd, TESTFILE_SIZE, afd);
+
+ io_destroy(ctx);
+ close(fd);
+ close(afd);
+ remove(testfn);
+
+ return 0;
+}
+
--- /dev/null
+/*
+ * eventfd-aio-test by Davide Libenzi (test app for eventfd hooked into KAIO)
+ * Copyright (C) 2007 Davide Libenzi
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 2 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program; if not, write to the Free Software
+ * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA
+ *
+ * Davide Libenzi <davidel@xmailserver.org>
+ *
+ */
+
+#define _GNU_SOURCE
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <sys/signal.h>
+#include <sys/time.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+#include <errno.h>
+
+
+/*
+ * This were good at the time of 2.6.21-rc5.mm4 ...
+ */
+#ifndef __NR_eventfd
+#if defined(__x86_64__)
+#define __NR_eventfd 283
+#elif defined(__i386__)
+#define __NR_eventfd 323
+#else
+#error Cannot detect your architecture!
+#endif
+#endif
+
+
+
+#define TESTFILE_SIZE (4096 * 5120)
+#define IORTX_SIZE (1024 * 4)
+#define NUM_EVENTS 128
+
+
+
+typedef unsigned long aio_context_t;
+
+enum {
+ IOCB_CMD_PREAD = 0,
+ IOCB_CMD_PWRITE = 1,
+ IOCB_CMD_FSYNC = 2,
+ IOCB_CMD_FDSYNC = 3,
+ /* These two are experimental.
+ * IOCB_CMD_PREADX = 4,
+ * IOCB_CMD_POLL = 5,
+ */
+ IOCB_CMD_NOOP = 6,
+ IOCB_CMD_PREADV = 7,
+ IOCB_CMD_PWRITEV = 8,
+};
+
+#if defined(__LITTLE_ENDIAN)
+#define PADDED(x,y) x, y
+#elif defined(__BIG_ENDIAN)
+#define PADDED(x,y) y, x
+#else
+#error edit for your odd byteorder.
+#endif
+
+#define IOCB_FLAG_RESFD (1 << 0)
+
+/*
+ * we always use a 64bit off_t when communicating
+ * with userland. its up to libraries to do the
+ * proper padding and aio_error abstraction
+ */
+struct iocb {
+ /* these are internal to the kernel/libc. */
+ u_int64_t aio_data; /* data to be returned in event's data */
+ u_int32_t PADDED(aio_key, aio_reserved1);
+ /* the kernel sets aio_key to the req # */
+
+ /* common fields */
+ u_int16_t aio_lio_opcode; /* see IOCB_CMD_ above */
+ int16_t aio_reqprio;
+ u_int32_t aio_fildes;
+
+ u_int64_t aio_buf;
+ u_int64_t aio_nbytes;
+ int64_t aio_offset;
+
+ /* extra parameters */
+ u_int64_t aio_reserved2; /* TODO: use this for a (struct sigevent *) */
+
+ u_int32_t aio_flags;
+ /*
+ * If different from 0, this is an eventfd to deliver AIO results to
+ */
+ u_int32_t aio_resfd;
+}; /* 64 bytes */
+
+struct io_event {
+ u_int64_t data; /* the data field from the iocb */
+ u_int64_t obj; /* what iocb this event came from */
+ int64_t res; /* result code for this event */
+ int64_t res2; /* secondary result */
+};
+
+
+
+
+
+static void asyio_prep_preadv(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PREADV;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) iov;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pwritev(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PWRITEV;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) iov;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pread(struct iocb *iocb, int fd, void *buf, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PREAD;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) buf;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pwrite(struct iocb *iocb, int fd, void const *buf, int nr_segs,
+ int64_t offset, int afd)
+{
+ memset(iocb, 0, sizeof(*iocb));
+ iocb->aio_fildes = fd;
+ iocb->aio_lio_opcode = IOCB_CMD_PWRITE;
+ iocb->aio_reqprio = 0;
+ iocb->aio_buf = (u_int64_t) buf;
+ iocb->aio_nbytes = nr_segs;
+ iocb->aio_offset = offset;
+ iocb->aio_flags = IOCB_FLAG_RESFD;
+ iocb->aio_resfd = afd;
+}
+
+static long io_setup(unsigned nr_reqs, aio_context_t *ctx) {
+
+ return syscall(__NR_io_setup, nr_reqs, ctx);
+}
+
+static long io_destroy(aio_context_t ctx) {
+
+ return syscall(__NR_io_destroy, ctx);
+}
+
+static long io_submit(aio_context_t ctx, long n, struct iocb **paiocb) {
+
+ return syscall(__NR_io_submit, ctx, n, paiocb);
+}
+
+static long io_cancel(aio_context_t ctx, struct iocb *aiocb, struct io_event *res) {
+
+ return syscall(__NR_io_cancel, ctx, aiocb, res);
+}
+
+static long io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events,
+ struct timespec *tmo) {
+
+ return syscall(__NR_io_getevents, ctx, min_nr, nr, events, tmo);
+}
+
+static int eventfd(int count) {
+
+ return syscall(__NR_eventfd, count);
+}
--- /dev/null
+/*
+ Unix SMB/CIFS implementation.
+
+ main select loop and event handling - aio/epoll hybrid implementation
+
+ Copyright (C) Andrew Tridgell 2006
+
+ based on events_standard.c
+
+ This program is free software; you can redistribute it and/or modify
+ it under the terms of the GNU General Public License as published by
+ the Free Software Foundation; either version 3 of the License, or
+ (at your option) any later version.
+
+ This program is distributed in the hope that it will be useful,
+ but WITHOUT ANY WARRANTY; without even the implied warranty of
+ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ GNU General Public License for more details.
+
+ You should have received a copy of the GNU General Public License
+ along with this program. If not, see <http://www.gnu.org/licenses/>.
+*/
+/*
+ this is a very strange beast. The Linux AIO implementation doesn't
+ yet integrate properly with epoll, but there is a kernel patch that
+ allows the aio wait primitives to be used to wait for epoll events,
+ and this can be used to give us a unified event system incorporating
+ both aio events and epoll events
+
+ this is _very_ experimental code
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "system/select.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#include "eventsfd-aio-test.h"
+
+struct aio_event_context {
+ /* a pointer back to the generic event_context */
+ struct tevent_context *ev;
+
+ uint32_t destruction_count;
+
+ int eventfd;
+
+ io_context_t ioctx;
+
+ struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
+
+ struct iocb *epoll_iocb;
+
+ int epoll_fd;
+ int is_epoll_set;
+ pid_t pid;
+};
+
+struct tevent_aio {
+ struct tevent_context *event_ctx;
+ struct iocb iocb;
+ void *private_data;
+ tevent_aio_handler_t handler;
+};
+
+/*
+ map from EVENT_FD_* to EPOLLIN/EPOLLOUT
+*/
+static uint32_t epoll_map_flags(uint16_t flags)
+{
+ uint32_t ret = 0;
+ if (flags & TEVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
+ if (flags & TEVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
+ return ret;
+}
+
+/*
+ free the epoll fd
+*/
+static int aio_ctx_destructor(struct aio_event_context *aio_ev)
+{
+ io_queue_release(aio_ev->ioctx);
+ close(aio_ev->epoll_fd);
+ aio_ev->epoll_fd = -1;
+ return 0;
+}
+
+static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde);
+
+/*
+ reopen the epoll handle when our pid changes
+ see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an
+ demonstration of why this is needed
+ */
+static void epoll_check_reopen(struct aio_event_context *aio_ev)
+{
+ struct tevent_fd *fde;
+
+ if (aio_ev->pid == getpid()) {
+ return;
+ }
+
+ close(aio_ev->epoll_fd);
+ aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
+ if (aio_ev->epoll_fd == -1) {
+ tevent_debug(aio_ev->ev, TEVENT_DEBUG_FATAL,
+ "Failed to recreate epoll handle after fork\n");
+ return;
+ }
+ aio_ev->pid = getpid();
+ for (fde=aio_ev->ev->fd_events;fde;fde=fde->next) {
+ epoll_add_event(aio_ev, fde);
+ }
+}
+
+#define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT (1<<0)
+#define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR (1<<1)
+#define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR (1<<2)
+
+/*
+ add the epoll event to the given fd_event
+*/
+static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+ struct epoll_event event;
+ if (aio_ev->epoll_fd == -1) return;
+
+ fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+ /* if we don't want events yet, don't add an aio_event */
+ if (fde->flags == 0) return;
+
+ memset(&event, 0, sizeof(event));
+ event.events = epoll_map_flags(fde->flags);
+ event.data.ptr = fde;
+ epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
+ fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+
+ /* only if we want to read we want to tell the event handler about errors */
+ if (fde->flags & TEVENT_FD_READ) {
+ fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+ }
+}
+
+/*
+ delete the epoll event for given fd_event
+*/
+static void epoll_del_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+ struct epoll_event event;
+
+ if (aio_ev->epoll_fd == -1) return;
+
+ fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+ /* if there's no aio_event, we don't need to delete it */
+ if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
+
+ ZERO_STRUCT(event);
+ event.events = epoll_map_flags(fde->flags);
+ event.data.ptr = fde;
+ epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
+
+ fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+}
+
+/*
+ change the epoll event to the given fd_event
+*/
+static void epoll_mod_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+ struct epoll_event event;
+ if (aio_ev->epoll_fd == -1) return;
+
+ fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+ ZERO_STRUCT(event);
+ event.events = epoll_map_flags(fde->flags);
+ event.data.ptr = fde;
+ epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
+
+ /* only if we want to read we want to tell the event handler about errors */
+ if (fde->flags & TEVENT_FD_READ) {
+ fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+ }
+}
+
+static void epoll_change_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+ bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
+ bool want_read = (fde->flags & TEVENT_FD_READ);
+ bool want_write= (fde->flags & TEVENT_FD_WRITE);
+
+ if (aio_ev->epoll_fd == -1) return;
+
+ fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+ /* there's already an event */
+ if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
+ if (want_read || (want_write && !got_error)) {
+ epoll_mod_event(aio_ev, fde);
+ return;
+ }
+ epoll_del_event(aio_ev, fde);
+ return;
+ }
+
+ /* there's no aio_event attached to the fde */
+ if (want_read || (want_write && !got_error)) {
+ epoll_add_event(aio_ev, fde);
+ return;
+ }
+}
+
+static int setup_epoll_wait(struct aio_event_context *aio_ev)
+{
+ if (aio_ev->is_epoll_set) {
+ return 0;
+ }
+ memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
+ aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
+ aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
+ aio_ev->epoll_iocb->aio_reqprio = 0;
+
+ aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
+ aio_ev->epoll_iocb->u.c.offset = -1;
+ aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
+
+ if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
+ return -1;
+ }
+ aio_ev->is_epoll_set = 1;
+
+ return 0;
+}
+
+
+/*
+ event loop handling using aio/epoll hybrid
+*/
+static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
+{
+ int ret, i;
+ uint32_t destruction_count = ++aio_ev->destruction_count;
+ struct timespec timeout;
+ struct io_event events[8];
+
+ if (aio_ev->epoll_fd == -1) return -1;
+
+ if (aio_ev->ev->signal_events &&
+ tevent_common_check_signal(aio_ev->ev)) {
+ return 0;
+ }
+
+ if (tvalp) {
+ timeout.tv_sec = tvalp->tv_sec;
+ timeout.tv_nsec = tvalp->tv_usec;
+ timeout.tv_nsec *= 1000;
+ }
+
+ if (setup_epoll_wait(aio_ev) < 0)
+ return -1;
+
+ ret = io_getevents(aio_ev->ioctx, 1, 8,
+ events, tvalp?&timeout:NULL);
+
+ if (ret == -EINTR) {
+ if (aio_ev->ev->signal_events) {
+ tevent_common_check_signal(aio_ev->ev);
+ }
+ return 0;
+ }
+
+ if (ret == 0 && tvalp) {
+ /* we don't care about a possible delay here */
+ tevent_common_loop_timer_delay(aio_ev->ev);
+ return 0;
+ }
+
+ for (i=0;i<ret;i++) {
+ struct io_event *event = &events[i];
+ struct iocb *finished = event->obj;
+
+ switch (finished->aio_lio_opcode) {
+ case IO_CMD_PWRITE:
+ case IO_CMD_PREAD: {
+ struct tevent_aio *ae = talloc_get_type(finished->data,
+ struct tevent_aio);
+ if (ae) {
+ talloc_set_destructor(ae, NULL);
+ ae->handler(ae->event_ctx, ae,
+ event->res, ae->private_data);
+ talloc_free(ae);
+ }
+ break;
+ }
+ case IOCB_CMD_EPOLL_WAIT: {
+ struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
+ struct tevent_fd *fde;
+ uint16_t flags = 0;
+ int j;
+
+ aio_ev->is_epoll_set = 0;
+
+ for (j=0; j<event->res; j++, ep++) {
+ fde = talloc_get_type(ep->data.ptr,
+ struct tevent_fd);
+ if (fde == NULL) {
+ return -1;
+ }
+ if (ep->events & (EPOLLHUP|EPOLLERR)) {
+ fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
+ if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
+ epoll_del_event(aio_ev, fde);
+ continue;
+ }
+ flags |= TEVENT_FD_READ;
+ }
+ if (ep->events & EPOLLIN) flags |= TEVENT_FD_READ;
+ if (ep->events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
+ if (flags) {
+ fde->handler(aio_ev->ev, fde, flags, fde->private_data);
+ }
+ }
+ break;
+ }
+ }
+ if (destruction_count != aio_ev->destruction_count) {
+ return 0;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ create a aio_event_context structure.
+*/
+static int aio_event_context_init(struct tevent_context *ev)
+{
+ struct aio_event_context *aio_ev;
+
+ aio_ev = talloc_zero(ev, struct aio_event_context);
+ if (!aio_ev) return -1;
+
+ aio_ev->ev = ev;
+ aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
+
+ if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
+ talloc_free(aio_ev);
+ return -1;
+ }
+
+ aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
+ if (aio_ev->epoll_fd == -1) {
+ talloc_free(aio_ev);
+ return -1;
+ }
+ aio_ev->pid = getpid();
+
+ talloc_set_destructor(aio_ev, aio_ctx_destructor);
+
+ ev->additional_data = aio_ev;
+
+ if (setup_epoll_wait(aio_ev) < 0) {
+ talloc_free(aio_ev);
+ return -1;
+ }
+
+ return 0;
+}
+
+/*
+ destroy an fd_event
+*/
+static int aio_event_fd_destructor(struct tevent_fd *fde)
+{
+ struct tevent_context *ev = fde->event_ctx;
+ struct aio_event_context *aio_ev = NULL;
+
+ if (ev) {
+ aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+
+ epoll_check_reopen(aio_ev);
+
+ aio_ev->destruction_count++;
+
+ epoll_del_event(aio_ev, fde);
+ }
+
+ return tevent_common_fd_destructor(fde);
+}
+
+/*
+ add a fd based event
+ return NULL on failure (memory allocation error)
+*/
+static struct tevent_fd *aio_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
+ int fd, uint16_t flags,
+ tevent_fd_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location)
+{
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct tevent_fd *fde;
+
+ epoll_check_reopen(aio_ev);
+
+ fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
+ handler, private_data,
+ handler_name, location);
+ if (!fde) return NULL;
+
+ talloc_set_destructor(fde, aio_event_fd_destructor);
+
+ epoll_add_event(aio_ev, fde);
+
+ return fde;
+}
+
+/*
+ set the fd event flags
+*/
+static void aio_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
+{
+ struct tevent_context *ev;
+ struct aio_event_context *aio_ev;
+
+ if (fde->flags == flags) return;
+
+ ev = fde->event_ctx;
+ aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
+
+ fde->flags = flags;
+
+ epoll_check_reopen(aio_ev);
+
+ epoll_change_event(aio_ev, fde);
+}
+
+/*
+ do a single event loop using the events defined in ev
+*/
+static int aio_event_loop_once(struct tevent_context *ev)
+{
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct timeval tval;
+
+ tval = tevent_common_loop_timer_delay(ev);
+ if (ev_timeval_is_zero(&tval)) {
+ return 0;
+ }
+
+ epoll_check_reopen(aio_ev);
+
+ return aio_event_loop(aio_ev, &tval);
+}
+
+/*
+ return on failure or (with 0) if all fd events are removed
+*/
+static int aio_event_loop_wait(struct tevent_context *ev)
+{
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ while (aio_ev->ev->fd_events) {
+ if (aio_event_loop_once(ev) != 0) {
+ break;
+ }
+ }
+
+ return 0;
+}
+
+/*
+ called when a disk IO event needs to be cancelled
+*/
+static int aio_destructor(struct tevent_aio *ae)
+{
+ struct tevent_context *ev = ae->event_ctx;
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct io_event result;
+ io_cancel(aio_ev->ioctx, &ae->iocb, &result);
+ /* TODO: handle errors from io_cancel()! */
+ return 0;
+}
+
+/* submit an aio disk IO event */
+static struct tevent_aio *aio_event_add_aio(struct tevent_context *ev,
+ TALLOC_CTX *mem_ctx,
+ struct iocb *iocb,
+ tevent_aio_handler_t handler,
+ void *private_data,
+ const char *handler_name,
+ const char *location)
+{
+ struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+ struct aio_event_context);
+ struct iocb *iocbp;
+ struct tevent_aio *ae = talloc(mem_ctx?mem_ctx:ev, struct tevent_aio);
+ if (ae == NULL) return NULL;
+
+ ae->event_ctx = ev;
+ ae->iocb = *iocb;
+ ae->handler = handler;
+ ae->private_data = private_data;
+ iocbp = &ae->iocb;
+
+ if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
+ talloc_free(ae);
+ return NULL;
+ }
+ ae->iocb.data = ae;
+ talloc_set_destructor(ae, aio_destructor);
+
+ return ae;
+}
+
+static const struct tevent_ops aio_event_ops = {
+ .context_init = aio_event_context_init,
+ .add_fd = aio_event_add_fd,
+ .add_aio = aio_event_add_aio,
+ .set_fd_close_fn= tevent_common_fd_set_close_fn,
+ .get_fd_flags = tevent_common_fd_get_flags,
+ .set_fd_flags = aio_event_set_fd_flags,
+ .add_timer = tevent_common_add_timer,
+ .add_signal = tevent_common_add_signal,
+ .loop_once = aio_event_loop_once,
+ .loop_wait = aio_event_loop_wait,
+};
+
+bool tevent_aio_init(void)
+{
+ return tevent_register_backend("aio", &aio_event_ops);
+}
+
void *additional_data;
};
+struct tevent_aio {
+ struct tevent_aio *prev, *next;
+ struct tevent_context *event_ctx;
+ int fd;
+ uint8_t *buffer;
+ uint64_t length;
+ uint64_t offset;
+ tevent_aio_handler_t handler;
+ /* this is private for the specific handler */
+ void *private_data;
+ /* this is for debugging only! */
+ const char *handler_name;
+ const char *location;
+ /* this is private for the events_ops implementation */
+ void *additional_data;
+};
+
struct tevent_threaded_context {
struct tevent_threaded_context *next, *prev;
pthread_mutex_t scheduled_mutex;
struct tevent_immediate *scheduled_immediates;
+ /* list of aio events - used by common code */
+ struct tevent_aio *aio_events;
+
/* this is private for the events_ops implementation */
void *additional_data;
int wakeup_read_fd;
#endif
+ /* aio eventfd() */
+ struct tevent_fd *pipe_fde;
+
/* debugging operations */
struct tevent_debug_ops debug_ops;