r3016: - converted the events code to talloc
authorAndrew Tridgell <tridge@samba.org>
Sun, 17 Oct 2004 10:04:49 +0000 (10:04 +0000)
committerGerald (Jerry) Carter <jerry@samba.org>
Wed, 10 Oct 2007 17:59:57 +0000 (12:59 -0500)
- added the new messaging system, based on unix domain sockets. It
  gets over 10k messages/second on my laptop without any socket
  cacheing, which is better than I expected.

- added a LOCAL-MESSAGING torture test
(This used to be commit 3af06478da7ab34a272226d8d9ac87e0a4940cfb)

15 files changed:
source4/configure.in
source4/lib/events.c
source4/lib/messaging/config.m4 [new file with mode: 0644]
source4/lib/messaging/config.mk [new file with mode: 0644]
source4/lib/messaging/messaging.c [new file with mode: 0644]
source4/lib/socket/socket_unix.c
source4/libcli/raw/clitransport.c
source4/librpc/rpc/dcerpc_tcp.c
source4/smbd/config.mk
source4/smbd/process_thread.c
source4/smbd/service.c
source4/smbd/service.h
source4/torture/config.mk
source4/torture/local/messaging.c [new file with mode: 0644]
source4/torture/torture.c

index 6bd8f9323c3eb0cca4381272b7455d4f99f36b70..703017ffc8183fc81c6fc3240358d5913758f84d 100644 (file)
@@ -15,6 +15,7 @@ SMB_INCLUDE_M4(lib/popt/config.m4)
 SMB_INCLUDE_M4(lib/iconv.m4)
 SMB_INCLUDE_M4(lib/basic.m4)
 SMB_INCLUDE_M4(lib/socket/config.m4)
+SMB_INCLUDE_M4(lib/messaging/config.m4)
 SMB_INCLUDE_M4(lib/tdb/config.m4)
 SMB_INCLUDE_M4(lib/ldb/config.m4)
 SMB_INCLUDE_M4(lib/cmdline/config.m4)
index 4ae7cad8af9c5c9d5cba9ec4d290065d35be3b01..0e98b425032d0555f289daf518e21c80e731a243 100644 (file)
   call, and all subsequent calls pass this event_context as the first
   element. Event handlers also receive this as their first argument.
 */
-struct event_context *event_context_init(void)
+struct event_context *event_context_init(TALLOC_CTX *mem_ctx)
 {
        struct event_context *ev;
 
-       ev = malloc(sizeof(*ev));
+       ev = talloc_p(mem_ctx, struct event_context);
        if (!ev) return NULL;
 
        /* start off with no events */
@@ -91,40 +91,12 @@ struct event_context *event_context_init(void)
 */
 void event_context_destroy(struct event_context *ev)
 {
-       struct fd_event *fde;
-       struct timed_event *te;
-       struct loop_event *le;
-
        ev->ref_count--;
        if (ev->ref_count != 0) {
                return;
        }
 
-       for (fde=ev->fd_events; fde;) {
-               struct fd_event *next = fde->next;
-               event_remove_fd(ev, fde);
-               if (fde->ref_count == 0) {
-                       free(fde);
-               }
-               fde=next;
-       }
-       for (te=ev->timed_events; te;) {
-               struct timed_event *next = te->next;
-               event_remove_timed(ev, te);
-               if (te->ref_count == 0) {
-                       free(te);
-               }
-               te=next;
-       }
-       for (le=ev->loop_events; le;) {
-               struct loop_event *next = le->next;
-               event_remove_loop(ev, le);
-               if (le->ref_count == 0) {
-                       free(le);
-               }
-               le=next;
-       }
-       free(ev);
+       talloc_free(ev);
 }
 
 
@@ -175,7 +147,7 @@ struct event_context * event_context_merge(struct event_context *ev, struct even
 */
 struct fd_event *event_add_fd(struct event_context *ev, struct fd_event *e) 
 {
-       e = memdup(e, sizeof(*e));
+       e = talloc_memdup(ev, e, sizeof(*e));
        if (!e) return NULL;
        DLIST_ADD(ev->fd_events, e);
        e->ref_count = 1;
@@ -245,7 +217,7 @@ void event_remove_fd_all_handler(struct event_context *ev, void *handler)
 */
 struct timed_event *event_add_timed(struct event_context *ev, struct timed_event *e) 
 {
-       e = memdup(e, sizeof(*e));
+       e = talloc_memdup(ev, e, sizeof(*e));
        if (!e) return NULL;
        e->ref_count = 1;
        DLIST_ADD(ev->timed_events, e);
@@ -274,7 +246,7 @@ BOOL event_remove_timed(struct event_context *ev, struct timed_event *e1)
 */
 struct loop_event *event_add_loop(struct event_context *ev, struct loop_event *e)
 {
-       e = memdup(e, sizeof(*e));
+       e = talloc_memdup(ev, e, sizeof(*e));
        if (!e) return NULL;
        e->ref_count = 1;
        DLIST_ADD(ev->loop_events, e);
@@ -330,7 +302,7 @@ int event_loop_once(struct event_context *ev)
                struct loop_event *next = le->next;
                if (le->ref_count == 0) {
                        DLIST_REMOVE(ev->loop_events, le);
-                       free(le);
+                       talloc_unlink(ev, le);
                } else {
                        le->ref_count++;
                        le->handler(ev, le, t);
@@ -351,7 +323,7 @@ int event_loop_once(struct event_context *ev)
                        if (ev->maxfd == fe->fd) {
                                ev->maxfd = EVENT_INVALID_MAXFD;
                        }
-                       free(fe);
+                       talloc_unlink(ev, fe);
                } else {
                        if (fe->flags & EVENT_FD_READ) {
                                FD_SET(fe->fd, &r_fds);
@@ -432,7 +404,7 @@ int event_loop_once(struct event_context *ev)
                struct timed_event *next = te->next;
                if (te->ref_count == 0) {
                        DLIST_REMOVE(ev->timed_events, te);
-                       free(te);
+                       talloc_unlink(ev, te);
                } else if (te->next_event <= t) {
                        te->ref_count++;
                        te->handler(ev, te, t);
diff --git a/source4/lib/messaging/config.m4 b/source4/lib/messaging/config.m4
new file mode 100644 (file)
index 0000000..1797069
--- /dev/null
@@ -0,0 +1 @@
+SMB_SUBSYSTEM_MK(MESSAGING,lib/messaging/config.mk)
diff --git a/source4/lib/messaging/config.mk b/source4/lib/messaging/config.mk
new file mode 100644 (file)
index 0000000..6c30894
--- /dev/null
@@ -0,0 +1,8 @@
+
+################################################
+# Start SUBSYSTEM MESSAGING
+[SUBSYSTEM::MESSAGING]
+INIT_OBJ_FILES = \
+               lib/messaging/messaging.o
+# End SUBSYSTEM MESSAGING
+################################################
diff --git a/source4/lib/messaging/messaging.c b/source4/lib/messaging/messaging.c
new file mode 100644 (file)
index 0000000..f9caf50
--- /dev/null
@@ -0,0 +1,445 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   Samba internal messaging functions
+
+   Copyright (C) Andrew Tridgell 2004
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+
+/* change the message version with any incompatible changes in the protocol */
+#define MESSAGING_VERSION 1
+
+struct messaging_state {
+       servid_t server_id;
+       struct socket_context *sock;
+       char *path;
+       struct dispatch_fn *dispatch;
+
+       struct {
+               struct event_context *ev;
+               struct fd_event *fde;
+       } event;
+};
+
+/* we have a linked list of dispatch handlers that this messaging
+   server can deal with */
+struct dispatch_fn {
+       struct dispatch_fn *next, *prev;
+       uint32_t msg_type;
+       void *private;
+       void (*fn)(void *msg_ctx, void *private, 
+                  uint32_t msg_type, servid_t server_id, DATA_BLOB *data);
+};
+
+/* an individual message */
+struct messaging_rec {
+       struct messaging_rec *next, *prev;
+
+       struct messaging_state *msg;
+       struct socket_context *sock;
+       struct fd_event *fde;
+       const char *path;
+
+       struct {
+               uint32_t version;
+               uint32_t msg_type;
+               servid_t from;
+               servid_t to;
+               uint32_t length;
+       } header;
+
+       DATA_BLOB data;
+
+       uint32_t ndone;
+};
+
+/*
+ A useful function for testing the message system.
+*/
+static void ping_message(void *msg_ctx, void *private, 
+                        uint32_t msg_type, servid_t src, DATA_BLOB *data)
+{
+       DEBUG(1,("INFO: Received PING message from server %u [%.*s]\n",
+                (uint_t)src, data->length, data->data?(const char *)data->data:""));
+       messaging_send(msg_ctx, src, MSG_PONG, data);
+}
+
+/* 
+   return the path to a messaging socket
+*/
+static char *messaging_path(TALLOC_CTX *mem_ctx, servid_t server_id)
+{
+       char *name = talloc_asprintf(mem_ctx, "messaging/msg.%u", (unsigned)server_id);
+       char *ret;
+       ret = lock_path(mem_ctx, name);
+       talloc_free(name);
+       return ret;
+}
+
+/*
+  dispatch a fully received message
+*/
+static void messaging_dispatch(struct messaging_state *msg, struct messaging_rec *rec)
+{
+       struct dispatch_fn *d;
+       for (d=msg->dispatch;d;d=d->next) {
+               if (d->msg_type == rec->header.msg_type) {
+                       d->fn(msg, d->private, d->msg_type, rec->header.from, &rec->data);
+               }
+       }
+
+       /* we don't free the record itself here as there may
+          be more messages from this client */
+       data_blob_free(&rec->data);
+       rec->header.length = 0;
+       rec->ndone = 0;
+}
+
+
+/*
+  handle IO for a single message
+*/
+static void messaging_recv_handler(struct event_context *ev, struct fd_event *fde, 
+                                time_t t, uint16_t flags)
+{
+       struct messaging_rec *rec = fde->private;
+       struct messaging_state *msg = rec->msg;
+       NTSTATUS status;
+
+       if (rec->ndone < sizeof(rec->header)) {
+               /* receive the header */
+               DATA_BLOB blob;
+               status = socket_recv(rec->sock, rec, 
+                                    &blob, sizeof(rec->header) - rec->ndone, 0);
+               if (NT_STATUS_IS_ERR(status)) {
+                       talloc_free(rec);
+                       return;
+               }
+
+               if (blob.length == 0) {
+                       return;
+               }
+
+               memcpy(rec->ndone + (char *)&rec->header, blob.data, blob.length);
+               rec->ndone += blob.length;
+               data_blob_free(&blob);
+
+               if (rec->ndone == sizeof(rec->header)) {
+                       if (rec->header.version != MESSAGING_VERSION) {
+                               DEBUG(0,("meessage with wrong version %u\n",
+                                        rec->header.version));
+                               talloc_free(rec);
+                       }
+                       rec->data = data_blob_talloc(rec, NULL, rec->header.length);
+                       if (rec->data.length != rec->header.length) {
+                               DEBUG(0,("Unable to allocate message of size %u\n",
+                                        rec->header.length));
+                               talloc_free(rec);
+                       }
+               }
+       }
+
+       if (rec->ndone >= sizeof(rec->header) && 
+           rec->ndone < sizeof(rec->header) + rec->header.length) {
+               /* receive the body, if any */
+               DATA_BLOB blob;
+               status = socket_recv(rec->sock, rec, 
+                                    &blob, sizeof(rec->header) + rec->header.length - rec->ndone, 0);
+               if (NT_STATUS_IS_ERR(status)) {
+                       talloc_free(rec);
+                       return;
+               }
+
+               if (blob.length == 0) {
+                       return;
+               }
+
+               memcpy(rec->data.data + (rec->ndone - sizeof(rec->header)), 
+                      blob.data, blob.length);
+
+               rec->ndone += blob.length;
+       }
+
+       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
+               /* we've got the whole message */
+               messaging_dispatch(msg, rec);
+       }
+}
+
+/*
+  destroy a messaging record
+*/
+static int rec_destructor(void *ptr)
+{
+       struct messaging_rec *rec = ptr;
+       struct messaging_state *msg = rec->msg;
+       event_remove_fd(msg->event.ev, rec->fde);
+       return 0;
+}
+
+/*
+  handle a new incoming connection
+*/
+static void messaging_listen_handler(struct event_context *ev, struct fd_event *fde, 
+                                    time_t t, uint16_t flags)
+{
+       struct messaging_state *msg = fde->private;
+       struct messaging_rec *rec;
+       NTSTATUS status;
+       struct fd_event fde2;
+
+       rec = talloc_p(msg, struct messaging_rec);
+       if (rec == NULL) {
+               smb_panic("Unable to allocate messaging_rec");
+       }
+
+       status = socket_accept(msg->sock, &rec->sock, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               smb_panic("Unable to accept messaging_rec");
+       }
+       talloc_steal(rec, rec->sock);
+
+       rec->msg = msg;
+       rec->ndone = 0;
+       rec->header.length = 0;
+       rec->path = msg->path;
+
+       fde2.private    = rec;
+       fde2.fd         = socket_get_fd(rec->sock);
+       fde2.flags      = EVENT_FD_READ;
+       fde2.handler    = messaging_recv_handler;
+
+       rec->fde        = event_add_fd(msg->event.ev, &fde2);
+
+       talloc_set_destructor(rec, rec_destructor);
+}
+
+/*
+  Register a dispatch function for a particular message type.
+*/
+void messaging_register(void *ctx, void *private,
+                       uint32_t msg_type, 
+                       void (*fn)(void *, void *, uint32_t, servid_t, DATA_BLOB *))
+{
+       struct messaging_state *msg = ctx;
+       struct dispatch_fn *d;
+
+       d = talloc_p(msg, struct dispatch_fn);
+       d->msg_type = msg_type;
+       d->private = private;
+       d->fn = fn;
+       DLIST_ADD(msg->dispatch, d);
+}
+
+/*
+  De-register the function for a particular message type.
+*/
+void messaging_deregister(void *ctx, uint32_t msg_type)
+{
+       struct messaging_state *msg = ctx;
+       struct dispatch_fn *d, *next;
+
+       for (d = msg->dispatch; d; d = next) {
+               next = d->next;
+               if (d->msg_type == msg_type) {
+                       DLIST_REMOVE(msg->dispatch, d);
+                       talloc_free(d);
+               }
+       }       
+}
+
+
+
+/*
+  handle IO for sending a message
+*/
+static void messaging_send_handler(struct event_context *ev, struct fd_event *fde, 
+                                  time_t t, uint16_t flags)
+{
+       struct messaging_rec *rec = fde->private;
+       NTSTATUS status;
+
+       if (rec->ndone < sizeof(rec->header)) {
+               /* send the header */
+               size_t nsent;
+               DATA_BLOB blob;
+
+               blob.data = rec->ndone + (char *)&rec->header;
+               blob.length = sizeof(rec->header) - rec->ndone;
+
+               status = socket_send(rec->sock, rec, &blob, &nsent, 0);
+               if (NT_STATUS_IS_ERR(status)) {
+                       talloc_free(rec);
+                       return;
+               }
+
+               if (nsent == 0) {
+                       return;
+               }
+
+               rec->ndone += nsent;
+       }
+
+       if (rec->ndone >= sizeof(rec->header) && 
+           rec->ndone < sizeof(rec->header) + rec->header.length) {
+               /* send the body, if any */
+               DATA_BLOB blob;
+               size_t nsent;
+
+               blob.data = rec->data.data + (rec->ndone - sizeof(rec->header));
+               blob.length = rec->header.length - (rec->ndone - sizeof(rec->header));
+
+               status = socket_send(rec->sock, rec, &blob, &nsent, 0);
+               if (NT_STATUS_IS_ERR(status)) {
+                       talloc_free(rec);
+                       return;
+               }
+
+               rec->ndone += nsent;
+       }
+
+       if (rec->ndone == sizeof(rec->header) + rec->header.length) {
+               /* we've done the whole message */
+               talloc_free(rec);
+       }
+}
+
+
+/*
+  Send a message to a particular server
+*/
+NTSTATUS messaging_send(void *msg_ctx, servid_t server, uint32_t msg_type, DATA_BLOB *data)
+{
+       struct messaging_state *msg = msg_ctx;
+       struct messaging_rec *rec;
+       NTSTATUS status;
+       struct fd_event fde;
+
+       rec = talloc_p(msg, struct messaging_rec);
+       if (rec == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       rec->msg = msg;
+       rec->header.version = MESSAGING_VERSION;
+       rec->header.msg_type = msg_type;
+       rec->header.from = msg->server_id;
+       rec->header.to = server;
+       rec->header.length = data?data->length:0;
+       if (rec->header.length != 0) {
+               rec->data = data_blob_talloc(rec, data->data, data->length);
+       } else {
+               rec->data = data_blob(NULL, 0);
+       }
+       rec->ndone = 0;
+
+       status = socket_create("unix", SOCKET_TYPE_STREAM, &rec->sock, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               talloc_free(rec);
+               return status;
+       }
+       talloc_steal(rec, rec->sock);
+
+       rec->path = messaging_path(rec, server);
+
+       status = socket_connect(rec->sock, NULL, 0, rec->path, 0, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               talloc_free(rec);
+               return status;
+       }
+
+       fde.private     = rec;
+       fde.fd          = socket_get_fd(rec->sock);
+       fde.flags       = EVENT_FD_WRITE;
+       fde.handler     = messaging_send_handler;
+
+       rec->fde        = event_add_fd(msg->event.ev, &fde);
+
+       talloc_set_destructor(rec, rec_destructor);
+
+       return NT_STATUS_OK;
+}
+
+
+/*
+  destroy the messaging context
+*/
+static int messaging_destructor(void *msg_ctx)
+{
+       struct messaging_state *msg = msg_ctx;
+       event_remove_fd(msg->event.ev, msg->event.fde);
+       return 0;
+}
+
+/*
+  create the listening socket and setup the dispatcher
+*/
+void *messaging_init(TALLOC_CTX *mem_ctx, servid_t server_id, struct event_context *ev)
+{
+       struct messaging_state *msg;
+       NTSTATUS status;
+       struct fd_event fde;
+
+       msg = talloc_p(mem_ctx, struct messaging_state);
+       if (msg == NULL) {
+               return NULL;
+       }
+
+       /* create the messaging directory if needed */
+       msg->path = lock_path(msg, "messaging");
+       mkdir(msg->path, 0700);
+       talloc_free(msg->path);
+
+       msg->server_id = server_id;
+       msg->dispatch = NULL;
+       msg->path = messaging_path(msg, server_id);
+
+       status = socket_create("unix", SOCKET_TYPE_STREAM, &msg->sock, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               talloc_free(msg);
+               return NULL;
+       }
+
+       /* by stealing here we ensure that the socket is cleaned up (and even 
+          deleted) on exit */
+       talloc_steal(msg, msg->sock);
+
+       status = socket_listen(msg->sock, msg->path, 0, 50, 0);
+       if (!NT_STATUS_IS_OK(status)) {
+               DEBUG(0,("Unable to setup messaging listener for '%s'\n", msg->path));
+               talloc_free(msg);
+               return NULL;
+       }
+
+       fde.private     = msg;
+       fde.fd          = socket_get_fd(msg->sock);
+       fde.flags       = EVENT_FD_READ;
+       fde.handler     = messaging_listen_handler;
+
+       msg->event.ev   = ev;
+       msg->event.fde  = event_add_fd(ev, &fde);
+
+       talloc_set_destructor(msg, messaging_destructor);
+       
+       messaging_register(msg, NULL, MSG_PING, ping_message);
+
+       return msg;
+}
+
+
index d87eaf49c4bbbba755be9863f292a1cbc1642634..7e169c47a7ef5e96540cca47f228dae90d822273 100644 (file)
@@ -29,6 +29,7 @@ static NTSTATUS unixdom_init(struct socket_context *sock)
        if (sock->fd == -1) {
                return NT_STATUS_INSUFFICIENT_RESOURCES;
        }
+       sock->private_data = NULL;
 
        return NT_STATUS_OK;
 }
@@ -36,6 +37,11 @@ static NTSTATUS unixdom_init(struct socket_context *sock)
 static void unixdom_close(struct socket_context *sock)
 {
        close(sock->fd);
+       /* if we were listening, then don't leave the socket lying
+          around in the filesystem */
+       if (sock->private_data) {
+               unlink((const char *)sock->private_data);
+       }
 }
 
 static NTSTATUS unixdom_connect(struct socket_context *sock,
@@ -82,6 +88,9 @@ static NTSTATUS unixdom_listen(struct socket_context *sock,
                return NT_STATUS_INVALID_PARAMETER;
        }
 
+       /* delete if it already exists */
+       unlink(my_address);
+
        ZERO_STRUCT(my_addr);
        my_addr.sun_family = AF_UNIX;
        strncpy(my_addr.sun_path, my_address, sizeof(my_addr.sun_path));
@@ -104,6 +113,7 @@ static NTSTATUS unixdom_listen(struct socket_context *sock,
        }
 
        sock->state = SOCKET_STATE_SERVER_LISTEN;
+       sock->private_data = (void *)talloc_strdup(sock, my_address);
 
        return NT_STATUS_OK;
 }
index f06f2c57ff8849290132d1a1455fd3c64bacb4cf..3944afb638289b89228864b36a011fcf531ec2b5 100644 (file)
@@ -68,7 +68,7 @@ struct smbcli_transport *smbcli_transport_init(struct smbcli_socket *sock)
 
        ZERO_STRUCTP(transport);
 
-       transport->event.ctx = event_context_init();
+       transport->event.ctx = event_context_init(transport);
        if (transport->event.ctx == NULL) {
                talloc_free(transport);
                return NULL;
index e28e939a7aa6322f155a707b91a14cf4f7126375..130f20a86161d0d7bda754fc55d193098718d87f 100644 (file)
@@ -360,7 +360,7 @@ NTSTATUS dcerpc_pipe_open_tcp(struct dcerpc_pipe **p,
 
        tcp->fd = fd;
        tcp->server_name = talloc_strdup((*p), server);
-       tcp->event_ctx = event_context_init();
+       tcp->event_ctx = event_context_init(tcp);
        tcp->pending_send = NULL;
        tcp->recv.received = 0;
        tcp->recv.data = data_blob(NULL, 0);
index 5aa04725b4452c115d3bf691043162a1e7beba1d..55f040f8050c4e3a2b75fee0d72c8bba2f325063 100644 (file)
@@ -37,6 +37,8 @@ REQUIRED_SUBSYSTEMS = \
 [SUBSYSTEM::SERVER_SERVICE]
 INIT_OBJ_FILES = \
                smbd/service.o
+REQUIRED_SUBSYSTEMS = \
+               MESSAGING
 # End SUBSYSTEM SERVER
 #######################
 
index 85f30c9ddd773fd2ece5ade99a9c0bed0f25abbd..108b098b8a7fce3ec271a23880349771344bfb98 100644 (file)
@@ -72,7 +72,7 @@ static void thread_accept_connection(struct event_context *ev, struct fd_event *
           main event_context is continued.
        */
 
-       ev = event_context_init();
+       ev = event_context_init(server_socket);
        if (!ev) {
                DEBUG(0,("thread_accept_connection: failed to create event_context!\n"));
                socket_destroy(sock);
@@ -87,6 +87,7 @@ static void thread_accept_connection(struct event_context *ev, struct fd_event *
                return;
        }
 
+       talloc_steal(conn, ev);
        talloc_steal(conn, sock);
 
        /* TODO: is this MUTEX_LOCK in the right place here?
index 9a7ac7355915c03c74df54ffdd5ebc3f9dba98fb..d4ba9c990c191046bae8d6d35064dbdbc28c7644 100644 (file)
@@ -48,7 +48,7 @@ struct server_context *server_service_startup(const char *model)
 
        ZERO_STRUCTP(srv_ctx);
 
-       srv_ctx->events = event_context_init();
+       srv_ctx->events = event_context_init(srv_ctx);
        if (!srv_ctx->events) {
                DEBUG(0,("event_context_init() failed\n"));
                return NULL;    
@@ -247,6 +247,9 @@ struct server_connection *server_setup_connection(struct event_context *ev,
                return NULL;
        }
 
+       /* setup to receive internal messages on this connection */
+       srv_conn->messaging_ctx = messaging_init(srv_conn, srv_conn->server_id, ev);
+
        return srv_conn;
 }
 
index e9ef0bff06e832c65924c1b8be8243334cd2698e..93f24a34f96674e259715885a398b61eb0c2044c 100644 (file)
@@ -124,6 +124,8 @@ struct server_connection {
        struct server_socket *server_socket;
 
        struct server_service *service;
+
+       void *messaging_ctx;
 };
 
 #endif /* _SERVER_SERVICE_H */
index 81988c5275dc4826ac77d614563ad8ae61fb69dc..9967696c24b8ad888d155b0cb75e44a52f67b5de 100644 (file)
@@ -102,9 +102,11 @@ REQUIRED_SUBSYSTEMS = \
 [SUBSYSTEM::TORTURE_LOCAL]
 ADD_OBJ_FILES = \
                torture/local/iconv.o \
-               torture/local/talloc.o
+               torture/local/talloc.o \
+               torture/local/messaging.o
 REQUIRED_SUBSYSTEMS = \
-               LIBSMB
+               LIBSMB \
+               MESSAGING
 # End SUBSYSTEM TORTURE_LOCAL
 #################################
 
diff --git a/source4/torture/local/messaging.c b/source4/torture/local/messaging.c
new file mode 100644 (file)
index 0000000..7302e3c
--- /dev/null
@@ -0,0 +1,92 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   local test for messaging code
+
+   Copyright (C) Andrew Tridgell 2004
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+
+static void pong_message(void *msg_ctx, void *private, 
+                        uint32_t msg_type, servid_t src, DATA_BLOB *data)
+{
+       int *count = private;
+       (*count)++;
+}
+
+/*
+  test ping speed
+*/
+static BOOL test_ping_speed(TALLOC_CTX *mem_ctx)
+{
+       struct event_context *ev = event_context_init(mem_ctx);
+       void *msg_ctx1, *msg_ctx2;
+       int ping_count = 0;
+       int pong_count = 0;
+       BOOL ret = True;
+
+       msg_ctx1 = messaging_init(mem_ctx, 1, ev);
+       msg_ctx2 = messaging_init(mem_ctx, 2, ev);
+
+       messaging_register(msg_ctx2, &pong_count, MSG_PONG, pong_message);
+
+       start_timer();
+
+       printf("Sending pings for 10 seconds\n");
+       while (end_timer() < 10.0) {
+               DATA_BLOB data;
+               data.data = "testing";
+               data.length = strlen(data.data);
+
+               messaging_send(msg_ctx2, 1, MSG_PING, &data);
+               messaging_send(msg_ctx2, 1, MSG_PING, NULL);
+               ping_count += 2;
+               event_loop_once(ev);
+               event_loop_once(ev);
+       }
+
+       printf("waiting for %d remaining replies\n", ping_count - pong_count);
+       while (end_timer() < 30 && pong_count < ping_count) {
+               event_loop_once(ev);
+       }
+
+       if (ping_count != pong_count) {
+               printf("ping test failed! received %d, sent %d\n", pong_count, ping_count);
+               ret = False;
+       }
+
+       printf("ping rate of %.0f messages/sec\n", (ping_count+pong_count)/end_timer());
+
+       talloc_free(msg_ctx1);
+       talloc_free(msg_ctx2);
+
+       event_context_destroy(ev);
+       return ret;
+}
+
+BOOL torture_local_messaging(int dummy) 
+{
+       TALLOC_CTX *mem_ctx = talloc_init("torture_local_messaging");
+       BOOL ret = True;
+
+       ret &= test_ping_speed(mem_ctx);
+
+       talloc_free(mem_ctx);
+
+       return True;
+}
index 3ce40ffb4a2eacecfa2cad272ad51003f407530b..6b5d9ebd2eeff0b33225bd00a613b6d3e80d9df2 100644 (file)
@@ -3430,6 +3430,7 @@ static struct {
        {"LOCAL-NTLMSSP", torture_ntlmssp_self_check, 0},
        {"LOCAL-ICONV", torture_local_iconv, 0},
        {"LOCAL-TALLOC", torture_local_talloc, 0},
+       {"LOCAL-MESSAGING", torture_local_messaging, 0},
 
        /* ldap testers */
        {"LDAP-BASIC", torture_ldap_basic, 0},