lib: Add messaging_send_all
authorVolker Lendecke <vl@samba.org>
Sun, 5 Nov 2017 11:44:01 +0000 (12:44 +0100)
committerJeremy Allison <jra@samba.org>
Mon, 4 Dec 2017 23:56:13 +0000 (00:56 +0100)
This will replace message_send_all. With messaging_dgm_forall we have
a local broadcast mechanism, and ctdb can also broadcast
everywhere. So there's no need for a separate traverse/send mechanism.

There's no good error reporting mechanism for broadcasting, so make
this function void.

This drops the message_type filtering. I believe that this does not matter in
practice, since messaging is a lot cheaper with dgm instead of the old tdb
based messaging. If someone presents a use case where this matters, nowadays
I'd much rather extend the messaging_dgm lock file format (where the unique id
lives right now) with the filter bits.

Signed-off-by: Volker Lendecke <vl@samba.org>
Reviewed-by: Jeremy Allison <jra@samba.org>
source3/include/messages.h
source3/lib/messages.c

index 6a0340f5a7f16700d257315acb3b45b63773e32e..29c394af31753d183e2fedde5c9e022d469c6e5a 100644 (file)
@@ -96,6 +96,8 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
                            struct server_id server, uint32_t msg_type,
                            const struct iovec *iov, int iovlen,
                            const int *fds, size_t num_fds);
+void messaging_send_all(struct messaging_context *msg_ctx,
+                       int msg_type, const void *buf, size_t len);
 
 struct tevent_req *messaging_filtered_read_send(
        TALLOC_CTX *mem_ctx, struct tevent_context *ev,
index 01029b2d68b6ba958f4b3cfd923fb31741204148..a0a3f9fb1ba386274e5e6b9e2af83b0e3798c5b4 100644 (file)
 #include "lib/messages_ctdb_ref.h"
 #include "lib/messages_util.h"
 #include "cluster_support.h"
+#include "ctdbd_conn.h"
+#include "ctdb_srvids.h"
+
+#ifdef CLUSTER_SUPPORT
+#include "ctdb_protocol.h"
+#endif
 
 struct messaging_callback {
        struct messaging_callback *prev, *next;
@@ -513,6 +519,7 @@ static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
        }
        talloc_set_destructor(ctx, messaging_context_destructor);
 
+#ifdef CLUSTER_SUPPORT
        if (lp_clustering()) {
                ctx->msg_ctdb_ref = messaging_ctdb_ref(
                        ctx, ctx->event_ctx,
@@ -525,6 +532,8 @@ static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
                        goto done;
                }
        }
+#endif
+
        ctx->id.vnn = get_my_vnn();
 
        ctx->names_db = server_id_db_init(ctx,
@@ -836,6 +845,71 @@ NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
        return NT_STATUS_OK;
 }
 
+struct send_all_state {
+       struct messaging_context *msg_ctx;
+       int msg_type;
+       const void *buf;
+       size_t len;
+};
+
+static int send_all_fn(pid_t pid, void *private_data)
+{
+       struct send_all_state *state = private_data;
+       NTSTATUS status;
+
+       status = messaging_send_buf(state->msg_ctx, pid_to_procid(pid),
+                                   state->msg_type, state->buf, state->len);
+       if (!NT_STATUS_IS_OK(status)) {
+               DBG_WARNING("messaging_send_buf to %ju failed: %s\n",
+                           (uintmax_t)pid, nt_errstr(status));
+       }
+
+       return 0;
+}
+
+void messaging_send_all(struct messaging_context *msg_ctx,
+                       int msg_type, const void *buf, size_t len)
+{
+       struct send_all_state state = {
+               .msg_ctx = msg_ctx, .msg_type = msg_type,
+               .buf = buf, .len = len
+       };
+       int ret;
+
+#ifdef CLUSTER_SUPPORT
+       if (lp_clustering()) {
+               struct ctdbd_connection *conn = messaging_ctdb_connection();
+               uint8_t msghdr[MESSAGE_HDR_LENGTH];
+               struct iovec iov[] = {
+                       { .iov_base = msghdr,
+                         .iov_len = sizeof(msghdr) },
+                       { .iov_base = discard_const_p(void, buf),
+                         .iov_len = len }
+               };
+
+               message_hdr_put(msghdr, msg_type, messaging_server_id(msg_ctx),
+                               (struct server_id) {0});
+
+               ret = ctdbd_messaging_send_iov(
+                       conn, CTDB_BROADCAST_CONNECTED,
+                       CTDB_SRVID_SAMBA_PROCESS,
+                       iov, ARRAY_SIZE(iov));
+               if (ret != 0) {
+                       DBG_WARNING("ctdbd_messaging_send_iov failed: %s\n",
+                                   strerror(ret));
+               }
+
+               return;
+       }
+#endif
+
+       ret = messaging_dgm_forall(send_all_fn, &state);
+       if (ret != 0) {
+               DBG_WARNING("messaging_dgm_forall failed: %s\n",
+                           strerror(ret));
+       }
+}
+
 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
                                               struct messaging_rec *rec)
 {