lib: Add messaging_dgm
authorVolker Lendecke <vl@samba.org>
Mon, 24 Feb 2014 12:23:49 +0000 (12:23 +0000)
committerJeremy Allison <jra@samba.org>
Wed, 23 Apr 2014 20:33:08 +0000 (22:33 +0200)
Messaging based on unix domain datagram sockets

This makes every process participating in messaging bind on a unix domain
datagram socket, similar to the source4 based messaging. The details are a bit
different though:

Retry after EWOULDBLOCK is done with a blocking thread, not by polling. This
was the only way I could in experiments avoid a thundering herd or high load
under Linux in extreme overload situations like many thousands of processes
sending to one blocked process. If there are better ideas to do this in a
simple way, I'm more than happy to remove the pthreadpool dependency again.

There is only one socket per process, not per task. I don't think that per-task
sockets are really necessary, we can do filtering in user space. The message
contains the destination server_id, which contains the destination task_id. I
think we can rebase the source4 based imessaging on top of this, allowing
multiple imessaging contexts on top of one messaging_context. I had planned to
do this conversion before this goes in, but Jeremy convinced me that this has
value in itself :-)

Per socket we also create a fcntl-based lockfile to allow race-free cleanup of
orphaned sockets. This lockfile contains the unique_id, which in the future
will make the server_id.tdb obsolete.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/messages.h
source3/lib/messages.c
source3/lib/messages_dgm.c [new file with mode: 0644]
source3/smbd/server.c
source3/wscript_build

index 47c5f7a2d9d510d27f3d0f1a333dfee6f2296531..94379657242f382777a77c503583e5e615b68615 100644 (file)
@@ -91,6 +91,11 @@ struct messaging_backend {
        void *private_data;
 };
 
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+                           TALLOC_CTX *mem_ctx,
+                           struct messaging_backend **presult);
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid);
+
 NTSTATUS messaging_tdb_init(struct messaging_context *msg_ctx,
                            TALLOC_CTX *mem_ctx,
                            struct messaging_backend **presult);
index 4ff933dc6e7a02c44d685b7615f9fc3915d591d7..983fe699ed96f9a558729af8e76bb9d7eba0a14b 100644 (file)
@@ -197,10 +197,10 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
        ctx->id = procid_self();
        ctx->event_ctx = ev;
 
-       status = messaging_tdb_init(ctx, ctx, &ctx->local);
+       status = messaging_dgm_init(ctx, ctx, &ctx->local);
 
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(2, ("messaging_tdb_init failed: %s\n",
+               DEBUG(2, ("messaging_dgm_init failed: %s\n",
                          nt_errstr(status)));
                TALLOC_FREE(ctx);
                return NULL;
@@ -245,9 +245,9 @@ NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
 
        msg_ctx->id = procid_self();
 
-       status = messaging_tdb_init(msg_ctx, msg_ctx, &msg_ctx->local);
+       status = messaging_dgm_init(msg_ctx, msg_ctx, &msg_ctx->local);
        if (!NT_STATUS_IS_OK(status)) {
-               DEBUG(0, ("messaging_tdb_init failed: %s\n",
+               DEBUG(0, ("messaging_dgm_init failed: %s\n",
                          nt_errstr(status)));
                return status;
        }
diff --git a/source3/lib/messages_dgm.c b/source3/lib/messages_dgm.c
new file mode 100644 (file)
index 0000000..8327f9d
--- /dev/null
@@ -0,0 +1,409 @@
+/*
+ * Unix SMB/CIFS implementation.
+ * Samba internal messaging functions
+ * Copyright (C) 2013 by Volker Lendecke
+ *
+ * This program is free software; you can redistribute it and/or modify
+ * it under the terms of the GNU General Public License as published by
+ * the Free Software Foundation; either version 3 of the License, or
+ * (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with this program.  If not, see <http://www.gnu.org/licenses/>.
+ */
+
+#include "includes.h"
+#include "lib/util/data_blob.h"
+#include "lib/util/debug.h"
+#include "lib/unix_msg/unix_msg.h"
+#include "system/filesys.h"
+#include "messages.h"
+#include "lib/param/param.h"
+#include "poll_funcs/poll_funcs_tevent.h"
+#include "unix_msg/unix_msg.h"
+#include "librpc/gen_ndr/messaging.h"
+
+struct messaging_dgm_context {
+       struct messaging_context *msg_ctx;
+       struct poll_funcs msg_callbacks;
+       struct unix_msg_ctx *dgm_ctx;
+       char *cache_dir;
+       int lockfile_fd;
+};
+
+struct messaging_dgm_hdr {
+       uint32_t msg_version;
+       enum messaging_type msg_type;
+       struct server_id dst;
+       struct server_id src;
+};
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+                                  struct server_id pid, int msg_type,
+                                  const DATA_BLOB *data,
+                                  struct messaging_backend *backend);
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+                              uint8_t *msg, size_t msg_len,
+                              void *private_data);
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c);
+
+static int messaging_dgm_lockfile_create(const char *cache_dir, pid_t pid,
+                                        int *plockfile_fd, uint64_t unique)
+{
+       char buf[PATH_MAX];
+       char *dir, *to_free;
+       ssize_t dirlen;
+       char *lockfile_name;
+       int lockfile_fd;
+       struct flock lck = {};
+       int unique_len, ret;
+       ssize_t written;
+       bool ok;
+
+       dirlen = full_path_tos(cache_dir, "lck", buf, sizeof(buf),
+                              &dir, &to_free);
+       if (dirlen == -1) {
+               return ENOMEM;
+       }
+
+       ok = directory_create_or_exist_strict(dir, sec_initial_uid(), 0755);
+       if (!ok) {
+               ret = errno;
+               DEBUG(1, ("%s: Could not create lock directory: %s\n",
+                         __func__, strerror(ret)));
+               TALLOC_FREE(to_free);
+               return ret;
+       }
+
+       lockfile_name = talloc_asprintf(talloc_tos(), "%s/%u", dir,
+                                       (unsigned)pid);
+       TALLOC_FREE(to_free);
+       if (lockfile_name == NULL) {
+               DEBUG(1, ("%s: talloc_asprintf failed\n", __func__));
+               return ENOMEM;
+       }
+
+       /* no O_EXCL, existence check is via the fcntl lock */
+
+       lockfile_fd = open(lockfile_name, O_NONBLOCK|O_CREAT|O_WRONLY, 0644);
+       if (lockfile_fd == -1) {
+               ret = errno;
+               DEBUG(1, ("%s: open failed: %s\n", __func__, strerror(errno)));
+               goto fail_free;
+       }
+
+       lck.l_type = F_WRLCK;
+       lck.l_whence = SEEK_SET;
+       lck.l_start = 0;
+       lck.l_len = 0;
+
+       ret = fcntl(lockfile_fd, F_SETLK, &lck);
+       if (ret == -1) {
+               ret = errno;
+               DEBUG(1, ("%s: fcntl failed: %s\n", __func__, strerror(ret)));
+               goto fail_close;
+       }
+
+       unique_len = snprintf(buf, sizeof(buf), "%"PRIu64, unique);
+
+       /* shorten a potentially preexisting file */
+
+       ret = ftruncate(lockfile_fd, unique_len);
+       if (ret == -1) {
+               ret = errno;
+               DEBUG(1, ("%s: ftruncate failed: %s\n", __func__,
+                         strerror(ret)));
+               goto fail_unlink;
+       }
+
+       written = write(lockfile_fd, buf, unique_len);
+       if (written != unique_len) {
+               ret = errno;
+               DEBUG(1, ("%s: write failed: %s\n", __func__, strerror(ret)));
+               goto fail_unlink;
+       }
+
+       *plockfile_fd = lockfile_fd;
+       return 0;
+
+fail_unlink:
+       unlink(lockfile_name);
+fail_close:
+       close(lockfile_fd);
+fail_free:
+       TALLOC_FREE(lockfile_name);
+       return ret;
+}
+
+static int messaging_dgm_lockfile_remove(const char *cache_dir, pid_t pid)
+{
+       fstring fname;
+       char buf[PATH_MAX];
+       char *lockfile_name, *to_free;
+       ssize_t len;
+       int ret;
+
+       fstr_sprintf(fname, "lck/%u", (unsigned)pid);
+
+       len = full_path_tos(cache_dir, fname, buf, sizeof(buf),
+                           &lockfile_name, &to_free);
+       if (len == -1) {
+               return ENOMEM;
+       }
+
+       ret = unlink(lockfile_name);
+       if (ret == -1) {
+               ret = errno;
+               DEBUG(10, ("%s: unlink failed: %s\n", __func__,
+                          strerror(ret)));
+       }
+       TALLOC_FREE(to_free);
+       return ret;
+}
+
+NTSTATUS messaging_dgm_init(struct messaging_context *msg_ctx,
+                           TALLOC_CTX *mem_ctx,
+                           struct messaging_backend **presult)
+{
+       struct messaging_backend *result;
+       struct messaging_dgm_context *ctx;
+       struct server_id pid = messaging_server_id(msg_ctx);
+       int ret;
+       bool ok;
+       const char *cache_dir;
+       char *socket_dir, *socket_name;
+       uint64_t cookie;
+
+       cache_dir = lp_cache_directory();
+       if (cache_dir == NULL) {
+               NTSTATUS status = map_nt_error_from_unix(errno);
+               return status;
+       }
+
+       result = talloc(mem_ctx, struct messaging_backend);
+       if (result == NULL) {
+               goto fail_nomem;
+       }
+       ctx = talloc_zero(result, struct messaging_dgm_context);
+       if (ctx == NULL) {
+               goto fail_nomem;
+       }
+
+       result->private_data = ctx;
+       result->send_fn = messaging_dgm_send;
+       ctx->msg_ctx = msg_ctx;
+
+       ctx->cache_dir = talloc_strdup(ctx, cache_dir);
+       if (ctx->cache_dir == NULL) {
+               goto fail_nomem;
+       }
+       socket_dir = talloc_asprintf(ctx, "%s/msg", cache_dir);
+       if (socket_dir == NULL) {
+               goto fail_nomem;
+       }
+       socket_name = talloc_asprintf(ctx, "%s/%u", socket_dir,
+                                     (unsigned)pid.pid);
+       if (socket_name == NULL) {
+               goto fail_nomem;
+       }
+
+       sec_init();
+
+       ret = messaging_dgm_lockfile_create(cache_dir, pid.pid,
+                                           &ctx->lockfile_fd, pid.unique_id);
+       if (ret != 0) {
+               DEBUG(1, ("%s: messaging_dgm_create_lockfile failed: %s\n",
+                         __func__, strerror(ret)));
+               TALLOC_FREE(result);
+               return map_nt_error_from_unix(ret);
+       }
+
+       poll_funcs_init_tevent(&ctx->msg_callbacks, msg_ctx->event_ctx);
+
+       ok = directory_create_or_exist_strict(socket_dir, sec_initial_uid(),
+                                             0700);
+       if (!ok) {
+               DEBUG(1, ("Could not create socket directory\n"));
+               TALLOC_FREE(result);
+               return NT_STATUS_ACCESS_DENIED;
+       }
+       TALLOC_FREE(socket_dir);
+
+       unlink(socket_name);
+
+       generate_random_buffer((uint8_t *)&cookie, sizeof(cookie));
+
+       ret = unix_msg_init(socket_name, &ctx->msg_callbacks, 1024, cookie,
+                           messaging_dgm_recv, ctx, &ctx->dgm_ctx);
+       TALLOC_FREE(socket_name);
+       if (ret != 0) {
+               DEBUG(1, ("unix_msg_init failed: %s\n", strerror(ret)));
+               TALLOC_FREE(result);
+               return map_nt_error_from_unix(ret);
+       }
+       talloc_set_destructor(ctx, messaging_dgm_context_destructor);
+
+       *presult = result;
+       return NT_STATUS_OK;
+
+fail_nomem:
+       TALLOC_FREE(result);
+       return NT_STATUS_NO_MEMORY;
+}
+
+static int messaging_dgm_context_destructor(struct messaging_dgm_context *c)
+{
+       struct server_id pid = messaging_server_id(c->msg_ctx);
+
+       /*
+        * First delete the socket to avoid races. The lockfile is the
+        * indicator that we're still around.
+        */
+       unix_msg_free(c->dgm_ctx);
+
+       if (getpid() == pid.pid) {
+               (void)messaging_dgm_lockfile_remove(c->cache_dir, pid.pid);
+       }
+       close(c->lockfile_fd);
+       return 0;
+}
+
+static NTSTATUS messaging_dgm_send(struct messaging_context *msg_ctx,
+                                  struct server_id pid, int msg_type,
+                                  const DATA_BLOB *data,
+                                  struct messaging_backend *backend)
+{
+       struct messaging_dgm_context *ctx = talloc_get_type_abort(
+               backend->private_data, struct messaging_dgm_context);
+       fstring pid_str;
+       char buf[PATH_MAX];
+       char *dst_sock, *to_free;
+       struct messaging_dgm_hdr hdr;
+       struct iovec iov[2];
+       ssize_t pathlen;
+       int ret;
+
+       fstr_sprintf(pid_str, "msg/%u", (unsigned)pid.pid);
+
+       pathlen = full_path_tos(ctx->cache_dir, pid_str, buf, sizeof(buf),
+                               &dst_sock, &to_free);
+       if (pathlen == -1) {
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       hdr.msg_version = MESSAGE_VERSION;
+       hdr.msg_type = msg_type & MSG_TYPE_MASK;
+       hdr.dst = pid;
+       hdr.src = msg_ctx->id;
+
+       DEBUG(10, ("%s: Sending message 0x%x len %u to %s\n", __func__,
+                  (unsigned)hdr.msg_type, (unsigned)data->length,
+                  server_id_str(talloc_tos(), &pid)));
+
+       iov[0].iov_base = &hdr;
+       iov[0].iov_len = sizeof(hdr);
+       iov[1].iov_base = data->data;
+       iov[1].iov_len = data->length;
+
+       become_root();
+       ret = unix_msg_send(ctx->dgm_ctx, dst_sock, iov, ARRAY_SIZE(iov));
+       unbecome_root();
+
+       TALLOC_FREE(to_free);
+
+       if (ret != 0) {
+               return map_nt_error_from_unix(ret);
+       }
+       return NT_STATUS_OK;
+}
+
+static void messaging_dgm_recv(struct unix_msg_ctx *ctx,
+                              uint8_t *msg, size_t msg_len,
+                              void *private_data)
+{
+       struct messaging_dgm_context *dgm_ctx = talloc_get_type_abort(
+               private_data, struct messaging_dgm_context);
+       struct messaging_dgm_hdr *hdr;
+       struct messaging_rec rec;
+
+       if (msg_len < sizeof(*hdr)) {
+               DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
+               return;
+       }
+
+       /*
+        * unix_msg guarantees alignment, so we can cast here
+        */
+       hdr = (struct messaging_dgm_hdr *)msg;
+
+       rec.msg_version = hdr->msg_version;
+       rec.msg_type = hdr->msg_type;
+       rec.dest = hdr->dst;
+       rec.src = hdr->src;
+       rec.buf.data = msg + sizeof(*hdr);
+       rec.buf.length = msg_len - sizeof(*hdr);
+
+       DEBUG(10, ("%s: Received message 0x%x len %u from %s\n", __func__,
+                  (unsigned)hdr->msg_type, (unsigned)rec.buf.length,
+                  server_id_str(talloc_tos(), &rec.src)));
+
+       messaging_dispatch_rec(dgm_ctx->msg_ctx, &rec);
+}
+
+NTSTATUS messaging_dgm_cleanup(struct messaging_context *msg_ctx, pid_t pid)
+{
+       struct messaging_dgm_context *ctx = talloc_get_type_abort(
+               msg_ctx->local->private_data, struct messaging_dgm_context);
+       char *lockfile_name, *socket_name;
+       int fd, ret;
+       struct flock lck = {};
+       NTSTATUS status = NT_STATUS_OK;
+
+       lockfile_name = talloc_asprintf(talloc_tos(), "%s/lck/%u",
+                                       ctx->cache_dir, (unsigned)pid);
+       if (lockfile_name == NULL) {
+               return NT_STATUS_NO_MEMORY;
+       }
+       socket_name = talloc_asprintf(lockfile_name, "%s/msg/%u",
+                                     ctx->cache_dir, (unsigned)pid);
+       if (socket_name == NULL) {
+               TALLOC_FREE(lockfile_name);
+               return NT_STATUS_NO_MEMORY;
+       }
+
+       fd = open(lockfile_name, O_NONBLOCK|O_WRONLY, 0);
+       if (fd == -1) {
+               status = map_nt_error_from_unix(errno);
+               DEBUG(10, ("%s: open(%s) failed: %s\n", __func__,
+                          lockfile_name, strerror(errno)));
+               return status;
+       }
+
+       lck.l_type = F_WRLCK;
+       lck.l_whence = SEEK_SET;
+       lck.l_start = 0;
+       lck.l_len = 0;
+
+       ret = fcntl(fd, F_SETLK, &lck);
+       if (ret != 0) {
+               status = map_nt_error_from_unix(errno);
+               DEBUG(10, ("%s: Could not get lock: %s\n", __func__,
+                          strerror(errno)));
+               TALLOC_FREE(lockfile_name);
+               close(fd);
+               return status;
+       }
+
+       (void)unlink(socket_name);
+       (void)unlink(lockfile_name);
+       (void)close(fd);
+
+       TALLOC_FREE(lockfile_name);
+       return NT_STATUS_OK;
+}
index 96580ba4e2e537ad5c0aeaf5aa52e43b12c546e6..5ff370d1edb801e481e25e6423c67a6d40472b8a 100644 (file)
@@ -465,6 +465,8 @@ static void remove_child_pid(struct smbd_parent_context *parent,
        }
 
        if (unclean_shutdown) {
+               NTSTATUS status;
+
                /* a child terminated uncleanly so tickle all
                   processes to see if they can grab any of the
                   pending locks
@@ -488,6 +490,10 @@ static void remove_child_pid(struct smbd_parent_context *parent,
                 * terminated uncleanly.
                 */
                messaging_cleanup_server(parent->msg_ctx, child_id);
+
+               status = messaging_dgm_cleanup(parent->msg_ctx, pid);
+               DEBUG(10, ("%s: messaging_dgm_cleanup returned %s\n",
+                          __func__, nt_errstr(status)));
        }
 
        if (!serverid_deregister(child_id)) {
index 4d261c645f01328c60c51a1ea98a4955f6fcdd50..2ba7a9687736a5d0b034b18fa32c62b4d8701877 100755 (executable)
@@ -314,6 +314,7 @@ bld.SAMBA3_SUBSYSTEM('TDB_LIB',
 bld.SAMBA3_SUBSYSTEM('samba3core',
                    source='''lib/messages.c
                    lib/messages_local.c
+                   lib/messages_dgm.c
                    lib/util_cluster.c
                    lib/id_cache.c
                    lib/talloc_dict.c
@@ -352,6 +353,8 @@ bld.SAMBA3_SUBSYSTEM('samba3core',
                         UTIL_PW
                         SAMBA_VERSION
                         PTHREADPOOL
+                        UNIX_MSG
+                        POLL_FUNCS_TEVENT
                         interfaces
                         param
                         dbwrap