eventfd-aio ...
authorStefan Metzmacher <metze@samba.org>
Tue, 6 Jan 2009 10:53:36 +0000 (11:53 +0100)
committerStefan Metzmacher <metze@samba.org>
Thu, 17 May 2018 07:51:48 +0000 (09:51 +0200)
lib/tevent/eventfd-aio-test.c [new file with mode: 0644]
lib/tevent/eventfd-aio-test.h [new file with mode: 0644]
lib/tevent/tevent_aio2.c [new file with mode: 0644]
lib/tevent/tevent_internal.h

diff --git a/lib/tevent/eventfd-aio-test.c b/lib/tevent/eventfd-aio-test.c
new file mode 100644 (file)
index 0000000..1f29fa4
--- /dev/null
@@ -0,0 +1,380 @@
+/*
+ *  eventfd-aio-test by Davide Libenzi (test app for eventfd hooked into KAIO)
+ *  Copyright (C) 2007  Davide Libenzi
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ *  Davide Libenzi <davidel@xmailserver.org>
+ *
+ */
+
+#define _GNU_SOURCE
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <sys/signal.h>
+#include <sys/time.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+#include <errno.h>
+
+
+/*
+ * This were good at the time of 2.6.21-rc5.mm4 ...
+ */
+#ifndef __NR_eventfd
+#if defined(__x86_64__)
+#define __NR_eventfd 283
+#elif defined(__i386__)
+#define __NR_eventfd 323
+#else
+#error Cannot detect your architecture!
+#endif
+#endif
+
+
+
+#define TESTFILE_SIZE (4096 * 5120)
+#define IORTX_SIZE (1024 * 4)
+#define NUM_EVENTS 128
+
+
+
+typedef unsigned long aio_context_t;
+
+enum {
+       IOCB_CMD_PREAD = 0,
+               IOCB_CMD_PWRITE = 1,
+               IOCB_CMD_FSYNC = 2,
+               IOCB_CMD_FDSYNC = 3,
+               /* These two are experimental.
+                * IOCB_CMD_PREADX = 4,
+                * IOCB_CMD_POLL = 5,
+                */
+               IOCB_CMD_NOOP = 6,
+               IOCB_CMD_PREADV = 7,
+               IOCB_CMD_PWRITEV = 8,
+};
+
+#if defined(__LITTLE_ENDIAN)
+#define PADDED(x,y)    x, y
+#elif defined(__BIG_ENDIAN)
+#define PADDED(x,y)    y, x
+#else
+#error edit for your odd byteorder.
+#endif
+
+#define IOCB_FLAG_RESFD                (1 << 0)
+
+/*
+ * we always use a 64bit off_t when communicating
+ * with userland.  its up to libraries to do the
+ * proper padding and aio_error abstraction
+ */
+struct iocb {
+       /* these are internal to the kernel/libc. */
+       u_int64_t       aio_data;       /* data to be returned in event's data */
+       u_int32_t       PADDED(aio_key, aio_reserved1);
+       /* the kernel sets aio_key to the req # */
+
+       /* common fields */
+       u_int16_t       aio_lio_opcode; /* see IOCB_CMD_ above */
+       int16_t aio_reqprio;
+       u_int32_t       aio_fildes;
+
+       u_int64_t       aio_buf;
+       u_int64_t       aio_nbytes;
+       int64_t aio_offset;
+
+       /* extra parameters */
+       u_int64_t       aio_reserved2;  /* TODO: use this for a (struct sigevent *) */
+
+       u_int32_t       aio_flags;
+       /*
+        * If different from 0, this is an eventfd to deliver AIO results to
+        */
+       u_int32_t       aio_resfd;
+}; /* 64 bytes */
+
+struct io_event {
+       u_int64_t           data;           /* the data field from the iocb */
+       u_int64_t           obj;            /* what iocb this event came from */
+       int64_t           res;            /* result code for this event */
+       int64_t           res2;           /* secondary result */
+};
+
+
+
+
+
+void asyio_prep_preadv(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+                      int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PREADV;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) iov;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pwritev(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+                       int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PWRITEV;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) iov;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pread(struct iocb *iocb, int fd, void *buf, int nr_segs,
+                     int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PREAD;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) buf;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+void asyio_prep_pwrite(struct iocb *iocb, int fd, void const *buf, int nr_segs,
+                      int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PWRITE;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) buf;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+long io_setup(unsigned nr_reqs, aio_context_t *ctx) {
+
+       return syscall(__NR_io_setup, nr_reqs, ctx);
+}
+
+long io_destroy(aio_context_t ctx) {
+
+       return syscall(__NR_io_destroy, ctx);
+}
+
+long io_submit(aio_context_t ctx, long n, struct iocb **paiocb) {
+
+       return syscall(__NR_io_submit, ctx, n, paiocb);
+}
+
+long io_cancel(aio_context_t ctx, struct iocb *aiocb, struct io_event *res) {
+
+       return syscall(__NR_io_cancel, ctx, aiocb, res);
+}
+
+long io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events,
+                 struct timespec *tmo) {
+
+       return syscall(__NR_io_getevents, ctx, min_nr, nr, events, tmo);
+}
+
+int eventfd(int count) {
+
+       return syscall(__NR_eventfd, count);
+}
+
+long waitasync(int afd, int timeo) {
+       struct pollfd pfd;
+
+       pfd.fd = afd;
+       pfd.events = POLLIN;
+       pfd.revents = 0;
+       if (poll(&pfd, 1, timeo) < 0) {
+               perror("poll");
+               return -1;
+       }
+       if ((pfd.revents & POLLIN) == 0) {
+               fprintf(stderr, "no results completed\n");
+               return 0;
+       }
+
+       return 1;
+}
+
+long test_read(aio_context_t ctx, int fd, long range, int afd) {
+       long i, n, r, j;
+       u_int64_t eval;
+       struct iocb **piocb;
+       struct iocb *iocb;
+       struct timespec tmo;
+       static struct io_event events[NUM_EVENTS];
+       static char buf[IORTX_SIZE];
+
+       n = range / IORTX_SIZE;
+       iocb = malloc(n * sizeof(struct iocb));
+       piocb = malloc(n * sizeof(struct iocb *));
+       if (!iocb || !piocb) {
+               perror("iocb alloc");
+               return -1;
+       }
+       for (i = 0; i < n; i++) {
+               piocb[i] = &iocb[i];
+               asyio_prep_pread(&iocb[i], fd, buf, sizeof(buf),
+                                (n - i - 1) * IORTX_SIZE, afd);
+               iocb[i].aio_data = (u_int64_t) i + 1;
+       }
+       fprintf(stdout, "submitting read request ...\n");
+       if (io_submit(ctx, n, piocb) <= 0) {
+               perror("io_submit");
+               return -1;
+       }
+       for (i = 0; i < n;) {
+               fprintf(stdout, "waiting ... ");
+               waitasync(afd, -1);
+               eval = 0;
+               if (read(afd, &eval, sizeof(eval)) != sizeof(eval))
+                       perror("read");
+               fprintf(stdout, "done! %llu\n", (unsigned long long) eval);
+               while (eval > 0) {
+                       tmo.tv_sec = 0;
+                       tmo.tv_nsec = 0;
+                       r = io_getevents(ctx, 1, eval > NUM_EVENTS ? NUM_EVENTS: (long) eval,
+                                        events, &tmo);
+                       if (r > 0) {
+                               for (j = 0; j < r; j++) {
+
+                               }
+                               i += r;
+                               eval -= r;
+                               fprintf(stdout, "test_write got %ld/%ld results so far\n",
+                                       i, n);
+                       }
+               }
+       }
+       free(iocb);
+       free(piocb);
+
+       return n;
+}
+
+long test_write(aio_context_t ctx, int fd, long range, int afd) {
+       long i, n, r, j;
+       u_int64_t eval;
+       struct iocb **piocb;
+       struct iocb *iocb;
+       struct timespec tmo;
+       static struct io_event events[NUM_EVENTS];
+       static char buf[IORTX_SIZE];
+
+       for (i = 0; i < IORTX_SIZE; i++)
+               buf[i] = i & 0xff;
+       n = range / IORTX_SIZE;
+       iocb = malloc(n * sizeof(struct iocb));
+       piocb = malloc(n * sizeof(struct iocb *));
+       if (!iocb || !piocb) {
+               perror("iocb alloc");
+               return -1;
+       }
+       for (i = 0; i < n; i++) {
+               piocb[i] = &iocb[i];
+               asyio_prep_pwrite(&iocb[i], fd, buf, sizeof(buf),
+                                 (n - i - 1) * IORTX_SIZE, afd);
+               iocb[i].aio_data = (u_int64_t) i + 1;
+       }
+       fprintf(stdout, "submitting write request ...\n");
+       if (io_submit(ctx, n, piocb) <= 0) {
+               perror("io_submit");
+               return -1;
+       }
+       for (i = 0; i < n;) {
+               fprintf(stdout, "waiting ... ");
+               waitasync(afd, -1);
+               eval = 0;
+               if (read(afd, &eval, sizeof(eval)) != sizeof(eval))
+                       perror("read");
+               fprintf(stdout, "done! %llu\n", (unsigned long long) eval);
+               while (eval > 0) {
+                       tmo.tv_sec = 0;
+                       tmo.tv_nsec = 0;
+                       r = io_getevents(ctx, 1, eval > NUM_EVENTS ? NUM_EVENTS: (long) eval,
+                                        events, &tmo);
+                       if (r > 0) {
+                               for (j = 0; j < r; j++) {
+
+                               }
+                               i += r;
+                               eval -= r;
+                               fprintf(stdout, "test_write got %ld/%ld results so far\n",
+                                       i, n);
+                       }
+               }
+       }
+       free(iocb);
+       free(piocb);
+
+       return n;
+}
+
+int main(int ac, char **av) {
+       int afd, fd;
+       aio_context_t ctx = 0;
+       char const *testfn = "/tmp/eventfd-aio-test.data";
+
+       fprintf(stdout, "creating an eventfd ...\n");
+       if ((afd = eventfd(0)) == -1) {
+               perror("eventfd");
+               return 2;
+       }
+       fprintf(stdout, "done! eventfd = %d\n", afd);
+       if (io_setup(TESTFILE_SIZE / IORTX_SIZE + 256, &ctx)) {
+               perror("io_setup");
+               return 3;
+       }
+       if ((fd = open(testfn, O_RDWR | O_CREAT, 0644)) == -1) {
+               perror(testfn);
+               return 4;
+       }
+       ftruncate(fd, TESTFILE_SIZE);
+
+       fcntl(afd, F_SETFL, fcntl(afd, F_GETFL, 0) | O_NONBLOCK);
+
+       test_write(ctx, fd, TESTFILE_SIZE, afd);
+       test_read(ctx, fd, TESTFILE_SIZE, afd);
+
+       io_destroy(ctx);
+       close(fd);
+       close(afd);
+       remove(testfn);
+
+       return 0;
+}
+
diff --git a/lib/tevent/eventfd-aio-test.h b/lib/tevent/eventfd-aio-test.h
new file mode 100644 (file)
index 0000000..d74d51c
--- /dev/null
@@ -0,0 +1,213 @@
+/*
+ *  eventfd-aio-test by Davide Libenzi (test app for eventfd hooked into KAIO)
+ *  Copyright (C) 2007  Davide Libenzi
+ *
+ *  This program is free software; you can redistribute it and/or modify
+ *  it under the terms of the GNU General Public License as published by
+ *  the Free Software Foundation; either version 2 of the License, or
+ *  (at your option) any later version.
+ *
+ *  This program is distributed in the hope that it will be useful,
+ *  but WITHOUT ANY WARRANTY; without even the implied warranty of
+ *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ *  GNU General Public License for more details.
+ *
+ *  You should have received a copy of the GNU General Public License
+ *  along with this program; if not, write to the Free Software
+ *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+ *
+ *  Davide Libenzi <davidel@xmailserver.org>
+ *
+ */
+
+#define _GNU_SOURCE
+#include <sys/syscall.h>
+#include <sys/types.h>
+#include <sys/signal.h>
+#include <sys/time.h>
+#include <sys/uio.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+#include <unistd.h>
+#include <signal.h>
+#include <poll.h>
+#include <fcntl.h>
+#include <time.h>
+#include <errno.h>
+
+
+/*
+ * This were good at the time of 2.6.21-rc5.mm4 ...
+ */
+#ifndef __NR_eventfd
+#if defined(__x86_64__)
+#define __NR_eventfd 283
+#elif defined(__i386__)
+#define __NR_eventfd 323
+#else
+#error Cannot detect your architecture!
+#endif
+#endif
+
+
+
+#define TESTFILE_SIZE (4096 * 5120)
+#define IORTX_SIZE (1024 * 4)
+#define NUM_EVENTS 128
+
+
+
+typedef unsigned long aio_context_t;
+
+enum {
+       IOCB_CMD_PREAD = 0,
+               IOCB_CMD_PWRITE = 1,
+               IOCB_CMD_FSYNC = 2,
+               IOCB_CMD_FDSYNC = 3,
+               /* These two are experimental.
+                * IOCB_CMD_PREADX = 4,
+                * IOCB_CMD_POLL = 5,
+                */
+               IOCB_CMD_NOOP = 6,
+               IOCB_CMD_PREADV = 7,
+               IOCB_CMD_PWRITEV = 8,
+};
+
+#if defined(__LITTLE_ENDIAN)
+#define PADDED(x,y)    x, y
+#elif defined(__BIG_ENDIAN)
+#define PADDED(x,y)    y, x
+#else
+#error edit for your odd byteorder.
+#endif
+
+#define IOCB_FLAG_RESFD                (1 << 0)
+
+/*
+ * we always use a 64bit off_t when communicating
+ * with userland.  its up to libraries to do the
+ * proper padding and aio_error abstraction
+ */
+struct iocb {
+       /* these are internal to the kernel/libc. */
+       u_int64_t       aio_data;       /* data to be returned in event's data */
+       u_int32_t       PADDED(aio_key, aio_reserved1);
+       /* the kernel sets aio_key to the req # */
+
+       /* common fields */
+       u_int16_t       aio_lio_opcode; /* see IOCB_CMD_ above */
+       int16_t aio_reqprio;
+       u_int32_t       aio_fildes;
+
+       u_int64_t       aio_buf;
+       u_int64_t       aio_nbytes;
+       int64_t aio_offset;
+
+       /* extra parameters */
+       u_int64_t       aio_reserved2;  /* TODO: use this for a (struct sigevent *) */
+
+       u_int32_t       aio_flags;
+       /*
+        * If different from 0, this is an eventfd to deliver AIO results to
+        */
+       u_int32_t       aio_resfd;
+}; /* 64 bytes */
+
+struct io_event {
+       u_int64_t           data;           /* the data field from the iocb */
+       u_int64_t           obj;            /* what iocb this event came from */
+       int64_t           res;            /* result code for this event */
+       int64_t           res2;           /* secondary result */
+};
+
+
+
+
+
+static void asyio_prep_preadv(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+                      int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PREADV;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) iov;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pwritev(struct iocb *iocb, int fd, struct iovec *iov, int nr_segs,
+                       int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PWRITEV;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) iov;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pread(struct iocb *iocb, int fd, void *buf, int nr_segs,
+                     int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PREAD;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) buf;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+static void asyio_prep_pwrite(struct iocb *iocb, int fd, void const *buf, int nr_segs,
+                      int64_t offset, int afd)
+{
+       memset(iocb, 0, sizeof(*iocb));
+       iocb->aio_fildes = fd;
+       iocb->aio_lio_opcode = IOCB_CMD_PWRITE;
+       iocb->aio_reqprio = 0;
+       iocb->aio_buf = (u_int64_t) buf;
+       iocb->aio_nbytes = nr_segs;
+       iocb->aio_offset = offset;
+       iocb->aio_flags = IOCB_FLAG_RESFD;
+       iocb->aio_resfd = afd;
+}
+
+static long io_setup(unsigned nr_reqs, aio_context_t *ctx) {
+
+       return syscall(__NR_io_setup, nr_reqs, ctx);
+}
+
+static long io_destroy(aio_context_t ctx) {
+
+       return syscall(__NR_io_destroy, ctx);
+}
+
+static long io_submit(aio_context_t ctx, long n, struct iocb **paiocb) {
+
+       return syscall(__NR_io_submit, ctx, n, paiocb);
+}
+
+static long io_cancel(aio_context_t ctx, struct iocb *aiocb, struct io_event *res) {
+
+       return syscall(__NR_io_cancel, ctx, aiocb, res);
+}
+
+static long io_getevents(aio_context_t ctx, long min_nr, long nr, struct io_event *events,
+                 struct timespec *tmo) {
+
+       return syscall(__NR_io_getevents, ctx, min_nr, nr, events, tmo);
+}
+
+static int eventfd(int count) {
+
+       return syscall(__NR_eventfd, count);
+}
diff --git a/lib/tevent/tevent_aio2.c b/lib/tevent/tevent_aio2.c
new file mode 100644 (file)
index 0000000..62ac9fd
--- /dev/null
@@ -0,0 +1,543 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   main select loop and event handling - aio/epoll hybrid implementation
+
+   Copyright (C) Andrew Tridgell       2006
+
+   based on events_standard.c
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 3 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program.  If not, see <http://www.gnu.org/licenses/>.
+*/
+/*
+  this is a very strange beast. The Linux AIO implementation doesn't
+  yet integrate properly with epoll, but there is a kernel patch that
+  allows the aio wait primitives to be used to wait for epoll events,
+  and this can be used to give us a unified event system incorporating
+  both aio events and epoll events
+
+  this is _very_ experimental code
+*/
+
+#include "replace.h"
+#include "system/filesys.h"
+#include "system/select.h"
+#include "tevent.h"
+#include "tevent_internal.h"
+#include "tevent_util.h"
+
+#include "eventsfd-aio-test.h"
+
+struct aio_event_context {
+       /* a pointer back to the generic event_context */
+       struct tevent_context *ev;
+
+       uint32_t destruction_count;
+
+       int eventfd;
+
+       io_context_t ioctx;
+
+       struct epoll_event epevent[MAX_AIO_QUEUE_DEPTH];
+
+       struct iocb *epoll_iocb;
+
+       int epoll_fd;
+       int is_epoll_set;
+       pid_t pid;
+};
+
+struct tevent_aio {
+       struct tevent_context *event_ctx;
+       struct iocb iocb;
+       void *private_data;
+       tevent_aio_handler_t handler;
+};
+
+/*
+  map from EVENT_FD_* to EPOLLIN/EPOLLOUT
+*/
+static uint32_t epoll_map_flags(uint16_t flags)
+{
+       uint32_t ret = 0;
+       if (flags & TEVENT_FD_READ) ret |= (EPOLLIN | EPOLLERR | EPOLLHUP);
+       if (flags & TEVENT_FD_WRITE) ret |= (EPOLLOUT | EPOLLERR | EPOLLHUP);
+       return ret;
+}
+
+/*
+ free the epoll fd
+*/
+static int aio_ctx_destructor(struct aio_event_context *aio_ev)
+{
+       io_queue_release(aio_ev->ioctx);
+       close(aio_ev->epoll_fd);
+       aio_ev->epoll_fd = -1;
+       return 0;
+}
+
+static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde);
+
+/*
+  reopen the epoll handle when our pid changes
+  see http://junkcode.samba.org/ftp/unpacked/junkcode/epoll_fork.c for an 
+  demonstration of why this is needed
+ */
+static void epoll_check_reopen(struct aio_event_context *aio_ev)
+{
+       struct tevent_fd *fde;
+
+       if (aio_ev->pid == getpid()) {
+               return;
+       }
+
+       close(aio_ev->epoll_fd);
+       aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
+       if (aio_ev->epoll_fd == -1) {
+               tevent_debug(aio_ev->ev, TEVENT_DEBUG_FATAL,
+                            "Failed to recreate epoll handle after fork\n");
+               return;
+       }
+       aio_ev->pid = getpid();
+       for (fde=aio_ev->ev->fd_events;fde;fde=fde->next) {
+               epoll_add_event(aio_ev, fde);
+       }
+}
+
+#define EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT     (1<<0)
+#define EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR  (1<<1)
+#define EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR     (1<<2)
+
+/*
+ add the epoll event to the given fd_event
+*/
+static void epoll_add_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+       struct epoll_event event;
+       if (aio_ev->epoll_fd == -1) return;
+
+       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+       /* if we don't want events yet, don't add an aio_event */
+       if (fde->flags == 0) return;
+
+       memset(&event, 0, sizeof(event));
+       event.events = epoll_map_flags(fde->flags);
+       event.data.ptr = fde;
+       epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_ADD, fde->fd, &event);
+       fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+
+       /* only if we want to read we want to tell the event handler about errors */
+       if (fde->flags & TEVENT_FD_READ) {
+               fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+       }
+}
+
+/*
+ delete the epoll event for given fd_event
+*/
+static void epoll_del_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+       struct epoll_event event;
+
+       if (aio_ev->epoll_fd == -1) return;
+
+       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+       /* if there's no aio_event, we don't need to delete it */
+       if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT)) return;
+
+       ZERO_STRUCT(event);
+       event.events = epoll_map_flags(fde->flags);
+       event.data.ptr = fde;
+       epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_DEL, fde->fd, &event);
+
+       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT;
+}
+
+/*
+ change the epoll event to the given fd_event
+*/
+static void epoll_mod_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+       struct epoll_event event;
+       if (aio_ev->epoll_fd == -1) return;
+
+       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+       ZERO_STRUCT(event);
+       event.events = epoll_map_flags(fde->flags);
+       event.data.ptr = fde;
+       epoll_ctl(aio_ev->epoll_fd, EPOLL_CTL_MOD, fde->fd, &event);
+
+       /* only if we want to read we want to tell the event handler about errors */
+       if (fde->flags & TEVENT_FD_READ) {
+               fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+       }
+}
+
+static void epoll_change_event(struct aio_event_context *aio_ev, struct tevent_fd *fde)
+{
+       bool got_error = (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR);
+       bool want_read = (fde->flags & TEVENT_FD_READ);
+       bool want_write= (fde->flags & TEVENT_FD_WRITE);
+
+       if (aio_ev->epoll_fd == -1) return;
+
+       fde->additional_flags &= ~EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR;
+
+       /* there's already an event */
+       if (fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_HAS_EVENT) {
+               if (want_read || (want_write && !got_error)) {
+                       epoll_mod_event(aio_ev, fde);
+                       return;
+               }
+               epoll_del_event(aio_ev, fde);
+               return;
+       }
+
+       /* there's no aio_event attached to the fde */
+       if (want_read || (want_write && !got_error)) {
+               epoll_add_event(aio_ev, fde);
+               return;
+       }
+}
+
+static int setup_epoll_wait(struct aio_event_context *aio_ev)
+{
+       if (aio_ev->is_epoll_set) {
+               return 0;
+       }
+       memset(aio_ev->epoll_iocb, 0, sizeof(*aio_ev->epoll_iocb));
+       aio_ev->epoll_iocb->aio_fildes = aio_ev->epoll_fd;
+       aio_ev->epoll_iocb->aio_lio_opcode = IOCB_CMD_EPOLL_WAIT;
+       aio_ev->epoll_iocb->aio_reqprio = 0;
+
+       aio_ev->epoll_iocb->u.c.nbytes = MAX_AIO_QUEUE_DEPTH;
+       aio_ev->epoll_iocb->u.c.offset = -1;
+       aio_ev->epoll_iocb->u.c.buf = aio_ev->epevent;
+
+       if (io_submit(aio_ev->ioctx, 1, &aio_ev->epoll_iocb) != 1) {
+               return -1;
+       }
+       aio_ev->is_epoll_set = 1;
+
+       return 0;
+}
+
+
+/*
+  event loop handling using aio/epoll hybrid
+*/
+static int aio_event_loop(struct aio_event_context *aio_ev, struct timeval *tvalp)
+{
+       int ret, i;
+       uint32_t destruction_count = ++aio_ev->destruction_count;
+       struct timespec timeout;
+       struct io_event events[8];
+
+       if (aio_ev->epoll_fd == -1) return -1;
+
+       if (aio_ev->ev->signal_events &&
+           tevent_common_check_signal(aio_ev->ev)) {
+               return 0;
+       }
+
+       if (tvalp) {
+               timeout.tv_sec = tvalp->tv_sec;
+               timeout.tv_nsec = tvalp->tv_usec;
+               timeout.tv_nsec *= 1000;
+       }
+
+       if (setup_epoll_wait(aio_ev) < 0) 
+               return -1;
+
+       ret = io_getevents(aio_ev->ioctx, 1, 8,
+                          events, tvalp?&timeout:NULL);
+
+       if (ret == -EINTR) {
+               if (aio_ev->ev->signal_events) {
+                       tevent_common_check_signal(aio_ev->ev);
+               }
+               return 0;
+       }
+
+       if (ret == 0 && tvalp) {
+               /* we don't care about a possible delay here */
+               tevent_common_loop_timer_delay(aio_ev->ev);
+               return 0;
+       }
+
+       for (i=0;i<ret;i++) {
+               struct io_event *event = &events[i];
+               struct iocb *finished = event->obj;
+
+               switch (finished->aio_lio_opcode) {
+               case IO_CMD_PWRITE:
+               case IO_CMD_PREAD: {
+                       struct tevent_aio *ae = talloc_get_type(finished->data, 
+                                                              struct tevent_aio);
+                       if (ae) {
+                               talloc_set_destructor(ae, NULL);
+                               ae->handler(ae->event_ctx, ae, 
+                                           event->res, ae->private_data);
+                               talloc_free(ae);
+                       }
+                       break;
+               }
+               case IOCB_CMD_EPOLL_WAIT: {
+                       struct epoll_event *ep = (struct epoll_event *)finished->u.c.buf;
+                       struct tevent_fd *fde;
+                       uint16_t flags = 0;
+                       int j;
+
+                       aio_ev->is_epoll_set = 0;
+
+                       for (j=0; j<event->res; j++, ep++) {
+                               fde = talloc_get_type(ep->data.ptr, 
+                                                     struct tevent_fd);
+                               if (fde == NULL) {
+                                       return -1;
+                               }
+                               if (ep->events & (EPOLLHUP|EPOLLERR)) {
+                                       fde->additional_flags |= EPOLL_ADDITIONAL_FD_FLAG_GOT_ERROR;
+                                       if (!(fde->additional_flags & EPOLL_ADDITIONAL_FD_FLAG_REPORT_ERROR)) {
+                                               epoll_del_event(aio_ev, fde);
+                                               continue;
+                                       }
+                                       flags |= TEVENT_FD_READ;
+                               }
+                               if (ep->events & EPOLLIN) flags |= TEVENT_FD_READ;
+                               if (ep->events & EPOLLOUT) flags |= TEVENT_FD_WRITE;
+                               if (flags) {
+                                       fde->handler(aio_ev->ev, fde, flags, fde->private_data);
+                               }
+                       }
+                       break;
+               }
+               }
+               if (destruction_count != aio_ev->destruction_count) {
+                       return 0;
+               }
+       }
+
+       return 0;
+}
+
+/*
+  create a aio_event_context structure.
+*/
+static int aio_event_context_init(struct tevent_context *ev)
+{
+       struct aio_event_context *aio_ev;
+       
+       aio_ev = talloc_zero(ev, struct aio_event_context);
+       if (!aio_ev) return -1;
+
+       aio_ev->ev = ev;
+       aio_ev->epoll_iocb = talloc(aio_ev, struct iocb);
+
+       if (io_queue_init(MAX_AIO_QUEUE_DEPTH, &aio_ev->ioctx) != 0) {
+               talloc_free(aio_ev);
+               return -1;
+       }
+
+       aio_ev->epoll_fd = epoll_create(MAX_AIO_QUEUE_DEPTH);
+       if (aio_ev->epoll_fd == -1) {
+               talloc_free(aio_ev);
+               return -1;
+       }
+       aio_ev->pid = getpid();
+
+       talloc_set_destructor(aio_ev, aio_ctx_destructor);
+
+       ev->additional_data = aio_ev;
+
+       if (setup_epoll_wait(aio_ev) < 0) {
+               talloc_free(aio_ev);
+               return -1;
+       }
+
+       return 0;
+}
+
+/*
+  destroy an fd_event
+*/
+static int aio_event_fd_destructor(struct tevent_fd *fde)
+{
+       struct tevent_context *ev = fde->event_ctx;
+       struct aio_event_context *aio_ev = NULL;
+
+       if (ev) {
+               aio_ev = talloc_get_type(ev->additional_data,
+                                        struct aio_event_context);
+
+               epoll_check_reopen(aio_ev);
+
+               aio_ev->destruction_count++;
+
+               epoll_del_event(aio_ev, fde);
+       }
+
+       return tevent_common_fd_destructor(fde);
+}
+
+/*
+  add a fd based event
+  return NULL on failure (memory allocation error)
+*/
+static struct tevent_fd *aio_event_add_fd(struct tevent_context *ev, TALLOC_CTX *mem_ctx,
+                                         int fd, uint16_t flags,
+                                         tevent_fd_handler_t handler,
+                                         void *private_data,
+                                         const char *handler_name,
+                                         const char *location)
+{
+       struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+                                                          struct aio_event_context);
+       struct tevent_fd *fde;
+
+       epoll_check_reopen(aio_ev);
+
+       fde = tevent_common_add_fd(ev, mem_ctx, fd, flags,
+                                  handler, private_data,
+                                  handler_name, location);
+       if (!fde) return NULL;
+
+       talloc_set_destructor(fde, aio_event_fd_destructor);
+
+       epoll_add_event(aio_ev, fde);
+
+       return fde;
+}
+
+/*
+  set the fd event flags
+*/
+static void aio_event_set_fd_flags(struct tevent_fd *fde, uint16_t flags)
+{
+       struct tevent_context *ev;
+       struct aio_event_context *aio_ev;
+
+       if (fde->flags == flags) return;
+
+       ev = fde->event_ctx;
+       aio_ev = talloc_get_type(ev->additional_data, struct aio_event_context);
+
+       fde->flags = flags;
+
+       epoll_check_reopen(aio_ev);
+
+       epoll_change_event(aio_ev, fde);
+}
+
+/*
+  do a single event loop using the events defined in ev 
+*/
+static int aio_event_loop_once(struct tevent_context *ev)
+{
+       struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+                                                          struct aio_event_context);
+       struct timeval tval;
+
+       tval = tevent_common_loop_timer_delay(ev);
+       if (ev_timeval_is_zero(&tval)) {
+               return 0;
+       }
+
+       epoll_check_reopen(aio_ev);
+
+       return aio_event_loop(aio_ev, &tval);
+}
+
+/*
+  return on failure or (with 0) if all fd events are removed
+*/
+static int aio_event_loop_wait(struct tevent_context *ev)
+{
+       struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+                                                          struct aio_event_context);
+       while (aio_ev->ev->fd_events) {
+               if (aio_event_loop_once(ev) != 0) {
+                       break;
+               }
+       }
+
+       return 0;
+}
+
+/*
+  called when a disk IO event needs to be cancelled
+*/
+static int aio_destructor(struct tevent_aio *ae)
+{
+       struct tevent_context *ev = ae->event_ctx;
+       struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+                                                          struct aio_event_context);
+       struct io_event result;
+       io_cancel(aio_ev->ioctx, &ae->iocb, &result);
+       /* TODO: handle errors from io_cancel()! */
+       return 0;
+}
+
+/* submit an aio disk IO event */
+static struct tevent_aio *aio_event_add_aio(struct tevent_context *ev, 
+                                           TALLOC_CTX *mem_ctx,
+                                           struct iocb *iocb,
+                                           tevent_aio_handler_t handler,
+                                           void *private_data,
+                                           const char *handler_name,
+                                           const char *location)
+{
+       struct aio_event_context *aio_ev = talloc_get_type(ev->additional_data,
+                                                          struct aio_event_context);
+       struct iocb *iocbp;
+       struct tevent_aio *ae = talloc(mem_ctx?mem_ctx:ev, struct tevent_aio);
+       if (ae == NULL) return NULL;
+
+       ae->event_ctx    = ev;
+       ae->iocb         = *iocb;
+       ae->handler      = handler;
+       ae->private_data = private_data;
+       iocbp = &ae->iocb;
+
+       if (io_submit(aio_ev->ioctx, 1, &iocbp) != 1) {
+               talloc_free(ae);
+               return NULL;
+       }
+       ae->iocb.data = ae;
+       talloc_set_destructor(ae, aio_destructor);
+
+       return ae;
+}
+
+static const struct tevent_ops aio_event_ops = {
+       .context_init   = aio_event_context_init,
+       .add_fd         = aio_event_add_fd,
+       .add_aio        = aio_event_add_aio,
+       .set_fd_close_fn= tevent_common_fd_set_close_fn,
+       .get_fd_flags   = tevent_common_fd_get_flags,
+       .set_fd_flags   = aio_event_set_fd_flags,
+       .add_timer      = tevent_common_add_timer,
+       .add_signal     = tevent_common_add_signal,
+       .loop_once      = aio_event_loop_once,
+       .loop_wait      = aio_event_loop_wait,
+};
+
+bool tevent_aio_init(void)
+{
+       return tevent_register_backend("aio", &aio_event_ops);
+}
+
index e90dd5ef82682dfb2c351aca4589f5f923bc178b..98f83ab34b8e200633634e294f178b46187b3fe9 100644 (file)
@@ -240,6 +240,23 @@ struct tevent_signal {
        void *additional_data;
 };
 
+struct tevent_aio {
+       struct tevent_aio *prev, *next;
+       struct tevent_context *event_ctx;
+       int fd;
+       uint8_t *buffer;
+       uint64_t length;
+       uint64_t offset;
+       tevent_aio_handler_t handler;
+       /* this is private for the specific handler */
+       void *private_data;
+       /* this is for debugging only! */
+       const char *handler_name;
+       const char *location;
+       /* this is private for the events_ops implementation */
+       void *additional_data;
+};
+
 struct tevent_threaded_context {
        struct tevent_threaded_context *next, *prev;
 
@@ -292,6 +309,9 @@ struct tevent_context {
        pthread_mutex_t scheduled_mutex;
        struct tevent_immediate *scheduled_immediates;
 
+       /* list of aio events - used by common code */
+       struct tevent_aio *aio_events;
+
        /* this is private for the events_ops implementation */
        void *additional_data;
 
@@ -302,6 +322,9 @@ struct tevent_context {
        int wakeup_read_fd;
 #endif
 
+       /* aio eventfd() */
+       struct tevent_fd *pipe_fde;
+
        /* debugging operations */
        struct tevent_debug_ops debug_ops;