#include "../lib/util/tevent_ntstatus.h"
#include "lib/param/param.h"
#include "lib/util/server_id_db.h"
+#include "../source3/lib/messages_dgm.h"
+#include "../source3/lib/messages_dgm_ref.h"
+#include "../source3/lib/messages_util.h"
#include <tdb.h>
/* change the message version with any incompatible changes in the protocol */
};
struct imessaging_context {
+ struct imessaging_context *prev, *next;
struct server_id server_id;
- struct socket_context *sock;
- const char *base_path;
- const char *path;
+ const char *sock_dir;
+ const char *lock_dir;
struct dispatch_fn **dispatch;
uint32_t num_types;
struct idr_context *dispatch_tree;
struct idr_context *idr;
struct server_id_db *names;
struct timeval start_time;
- struct tevent_timer *retry_te;
- struct {
- struct tevent_fd *fde;
- } event;
+ void *msg_dgm_ref;
};
/* we have a linked list of dispatch handlers for each msg_type that
return NT_STATUS_OK;
}
-/*
- return the path to a messaging socket
-*/
-static char *imessaging_path(struct imessaging_context *msg, struct server_id server_id)
-{
- struct server_id_buf buf;
-
- return talloc_asprintf(msg, "%s/msg.%s", msg->base_path,
- server_id_str_buf(server_id, &buf));
-}
-
-/*
- dispatch a fully received message
-
- note that this deliberately can match more than one message handler
- per message. That allows a single messasging context to register
- (for example) a debug handler for more than one piece of code
-*/
-static void imessaging_dispatch(struct imessaging_context *msg, struct imessaging_rec *rec)
+static struct dispatch_fn *imessaging_find_dispatch(
+ struct imessaging_context *msg, uint32_t msg_type)
{
- struct dispatch_fn *d, *next;
-
/* temporary IDs use an idtree, the rest use a array of pointers */
- if (rec->header->msg_type >= MSG_TMP_BASE) {
- d = (struct dispatch_fn *)idr_find(msg->dispatch_tree,
- rec->header->msg_type);
- } else if (rec->header->msg_type < msg->num_types) {
- d = msg->dispatch[rec->header->msg_type];
- } else {
- d = NULL;
- }
-
- for (; d; d = next) {
- DATA_BLOB data;
- next = d->next;
- data.data = rec->packet.data + sizeof(*rec->header);
- data.length = rec->header->length;
- d->fn(msg, d->private_data, d->msg_type, rec->header->from, &data);
- }
- rec->header->length = 0;
-}
-
-/*
- handler for messages that arrive from other nodes in the cluster
-*/
-static void cluster_message_handler(struct imessaging_context *msg, DATA_BLOB packet)
-{
- struct imessaging_rec *rec;
-
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- smb_panic("Unable to allocate imessaging_rec");
- }
-
- rec->msg = msg;
- rec->path = msg->path;
- rec->header = (struct imessaging_header *)packet.data;
- rec->packet = packet;
- rec->retries = 0;
-
- if (packet.length != sizeof(*rec->header) + rec->header->length) {
- DEBUG(0,("messaging: bad message header size %d should be %d\n",
- rec->header->length, (int)(packet.length - sizeof(*rec->header))));
- talloc_free(rec);
- return;
- }
-
- imessaging_dispatch(msg, rec);
- talloc_free(rec);
-}
-
-
-
-/*
- try to send the message
-*/
-static NTSTATUS try_send(struct imessaging_rec *rec)
-{
- struct imessaging_context *msg = rec->msg;
- size_t nsent;
- void *priv;
- NTSTATUS status;
- struct socket_address *path;
-
- /* rec->path is the path of the *other* socket, where we want
- * this to end up */
- path = socket_address_from_strings(msg, msg->sock->backend_name,
- rec->path, 0);
- if (!path) {
- return NT_STATUS_NO_MEMORY;
- }
-
- /* we send with privileges so messages work from any context */
- priv = root_privileges();
- status = socket_sendto(msg->sock, &rec->packet, &nsent, path);
- talloc_free(path);
- talloc_free(priv);
-
- return status;
-}
-
-/*
- retry backed off messages
-*/
-static void msg_retry_timer(struct tevent_context *ev, struct tevent_timer *te,
- struct timeval t, void *private_data)
-{
- struct imessaging_context *msg = talloc_get_type(private_data,
- struct imessaging_context);
- msg->retry_te = NULL;
-
- /* put the messages back on the main queue */
- while (msg->retry_queue) {
- struct imessaging_rec *rec = msg->retry_queue;
- DLIST_REMOVE(msg->retry_queue, rec);
- DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
- }
-
- TEVENT_FD_WRITEABLE(msg->event.fde);
-}
-
-/*
- handle a socket write event
-*/
-static void imessaging_send_handler(struct imessaging_context *msg, struct tevent_context *ev)
-{
- while (msg->pending) {
- struct imessaging_rec *rec = msg->pending;
- NTSTATUS status;
- status = try_send(rec);
- if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- rec->retries++;
- if (rec->retries > 3) {
- /* we're getting continuous write errors -
- backoff this record */
- DLIST_REMOVE(msg->pending, rec);
- DLIST_ADD_END(msg->retry_queue, rec,
- struct imessaging_rec *);
- if (msg->retry_te == NULL) {
- msg->retry_te =
- tevent_add_timer(ev, msg,
- timeval_current_ofs(1, 0),
- msg_retry_timer, msg);
- }
- }
- break;
- }
- rec->retries = 0;
- if (!NT_STATUS_IS_OK(status)) {
- TALLOC_CTX *tmp_ctx = talloc_new(msg);
- DEBUG(1,("messaging: Lost message from %s to %s of type %u - %s\n",
- server_id_str(tmp_ctx, &rec->header->from),
- server_id_str(tmp_ctx, &rec->header->to),
- rec->header->msg_type,
- nt_errstr(status)));
- talloc_free(tmp_ctx);
- }
- DLIST_REMOVE(msg->pending, rec);
- talloc_free(rec);
- }
- if (msg->pending == NULL) {
- TEVENT_FD_NOT_WRITEABLE(msg->event.fde);
- }
-}
-
-/*
- handle a new incoming packet
-*/
-static void imessaging_recv_handler(struct imessaging_context *msg, struct tevent_context *ev)
-{
- struct imessaging_rec *rec;
- NTSTATUS status;
- DATA_BLOB packet;
- size_t msize;
-
- /* see how many bytes are in the next packet */
- status = socket_pending(msg->sock, &msize);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0,("socket_pending failed in messaging - %s\n",
- nt_errstr(status)));
- return;
- }
-
- packet = data_blob_talloc(msg, NULL, msize);
- if (packet.data == NULL) {
- /* assume this is temporary and retry */
- return;
- }
-
- status = socket_recv(msg->sock, packet.data, msize, &msize);
- if (!NT_STATUS_IS_OK(status)) {
- data_blob_free(&packet);
- return;
- }
-
- if (msize < sizeof(*rec->header)) {
- DEBUG(0,("messaging: bad message of size %d\n", (int)msize));
- data_blob_free(&packet);
- return;
- }
-
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- smb_panic("Unable to allocate imessaging_rec");
- }
-
- talloc_steal(rec, packet.data);
- rec->msg = msg;
- rec->path = msg->path;
- rec->header = (struct imessaging_header *)packet.data;
- rec->packet = packet;
- rec->retries = 0;
-
- if (msize != sizeof(*rec->header) + rec->header->length) {
- DEBUG(0,("messaging: bad message header size %d should be %d\n",
- rec->header->length, (int)(msize - sizeof(*rec->header))));
- talloc_free(rec);
- return;
- }
-
- imessaging_dispatch(msg, rec);
- talloc_free(rec);
-}
-
-
-/*
- handle a socket event
-*/
-static void imessaging_handler(struct tevent_context *ev, struct tevent_fd *fde,
- uint16_t flags, void *private_data)
-{
- struct imessaging_context *msg = talloc_get_type(private_data,
- struct imessaging_context);
- if (flags & TEVENT_FD_WRITE) {
- imessaging_send_handler(msg, ev);
+ if (msg_type >= MSG_TMP_BASE) {
+ return (struct dispatch_fn *)idr_find(msg->dispatch_tree,
+ msg_type);
}
- if (flags & TEVENT_FD_READ) {
- imessaging_recv_handler(msg, ev);
+ if (msg_type < msg->num_types) {
+ return msg->dispatch[msg_type];
}
+ return NULL;
}
-
/*
Register a dispatch function for a particular message type.
*/
NTSTATUS imessaging_send(struct imessaging_context *msg, struct server_id server,
uint32_t msg_type, const DATA_BLOB *data)
{
- struct imessaging_rec *rec;
- NTSTATUS status;
- size_t dlength = data?data->length:0;
+ uint8_t hdr[MESSAGE_HDR_LENGTH];
+ struct iovec iov[2];
+ int num_iov, ret;
+ pid_t pid;
+ void *priv;
- rec = talloc(msg, struct imessaging_rec);
- if (rec == NULL) {
- return NT_STATUS_NO_MEMORY;
+ if (!cluster_node_equal(&msg->server_id, &server)) {
+ /* No cluster in source4... */
+ return NT_STATUS_OK;
}
- rec->packet = data_blob_talloc(rec, NULL, sizeof(*rec->header) + dlength);
- if (rec->packet.data == NULL) {
- talloc_free(rec);
- return NT_STATUS_NO_MEMORY;
- }
+ message_hdr_put(hdr, msg_type, msg->server_id, server);
- rec->retries = 0;
- rec->msg = msg;
- rec->header = (struct imessaging_header *)rec->packet.data;
- /* zero padding */
- ZERO_STRUCTP(rec->header);
- rec->header->version = IMESSAGING_VERSION;
- rec->header->msg_type = msg_type;
- rec->header->from = msg->server_id;
- rec->header->to = server;
- rec->header->length = dlength;
- if (dlength != 0) {
- memcpy(rec->packet.data + sizeof(*rec->header),
- data->data, dlength);
- }
+ iov[0] = (struct iovec) { .iov_base = &hdr, .iov_len = sizeof(hdr) };
+ num_iov = 1;
- if (!cluster_node_equal(&msg->server_id, &server)) {
- /* the destination is on another node - dispatch via
- the cluster layer */
- status = cluster_message_send(server, &rec->packet);
- talloc_free(rec);
- return status;
+ if (data != NULL) {
+ iov[1] = (struct iovec) { .iov_base = data->data,
+ .iov_len = data->length };
+ num_iov += 1;
}
- rec->path = imessaging_path(msg, server);
- talloc_steal(rec, rec->path);
-
- if (msg->pending != NULL) {
- status = STATUS_MORE_ENTRIES;
- } else {
- status = try_send(rec);
+ pid = server.pid;
+ if (pid == 0) {
+ pid = getpid();
}
- if (NT_STATUS_EQUAL(status, STATUS_MORE_ENTRIES)) {
- if (msg->pending == NULL) {
- TEVENT_FD_WRITEABLE(msg->event.fde);
- }
- DLIST_ADD_END(msg->pending, rec, struct imessaging_rec *);
- return NT_STATUS_OK;
+ priv = root_privileges();
+ ret = messaging_dgm_send(pid, iov, num_iov, NULL, 0);
+ TALLOC_FREE(priv);
+ if (ret != 0) {
+ return map_nt_error_from_unix_common(ret);
}
-
- talloc_free(rec);
-
- return status;
+ return NT_STATUS_OK;
}
/*
if (!msg) {
return 0;
}
-
- DEBUG(5,("imessaging: cleaning up %s\n", msg->path));
- unlink(msg->path);
return 0;
}
+static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+ int *fds, size_t num_fds,
+ void *private_data);
+
/*
create the listening socket and setup the dispatcher
bool auto_remove)
{
struct imessaging_context *msg;
- NTSTATUS status;
- struct socket_address *path;
bool ok;
+ int ret;
if (ev == NULL) {
return NULL;
return NULL;
}
- /* setup a handler for messages from other cluster nodes, if appropriate */
- status = cluster_message_init(msg, server_id, cluster_message_handler);
- if (!NT_STATUS_IS_OK(status)) {
- goto fail;
- }
-
/* create the messaging directory if needed */
- msg->base_path = lpcfg_imessaging_path(msg, lp_ctx);
- if (msg->base_path == NULL) {
+ msg->sock_dir = lpcfg_private_path(msg, lp_ctx, "sock");
+ if (msg->sock_dir == NULL) {
+ goto fail;
+ }
+ ok = directory_create_or_exist_strict(msg->sock_dir, geteuid(), 0700);
+ if (!ok) {
goto fail;
}
- ok = directory_create_or_exist_strict(msg->base_path, geteuid(), 0700);
+ msg->lock_dir = lpcfg_lock_path(msg, lp_ctx, "msg");
+ if (msg->lock_dir == NULL) {
+ goto fail;
+ }
+ ok = directory_create_or_exist_strict(msg->lock_dir, geteuid(), 0755);
if (!ok) {
goto fail;
}
- msg->path = imessaging_path(msg, server_id);
- if (msg->path == NULL) {
+ msg->msg_dgm_ref = messaging_dgm_ref(
+ msg, ev, server_id.unique_id, msg->sock_dir, msg->lock_dir,
+ imessaging_dgm_recv, msg, &ret);
+
+ if (msg->msg_dgm_ref == NULL) {
goto fail;
}
msg->start_time = timeval_current();
msg->names = server_id_db_init(
- msg, server_id, msg->base_path, 0,
+ msg, server_id, msg->lock_dir, 0,
TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST|
lpcfg_tdb_flags(lp_ctx, 0));
if (msg->names == NULL) {
goto fail;
}
- status = socket_create("unix", SOCKET_TYPE_DGRAM, &msg->sock, 0);
- if (!NT_STATUS_IS_OK(status)) {
- goto fail;
- }
-
- /* by stealing here we ensure that the socket is cleaned up (and even
- deleted) on exit */
- talloc_steal(msg, msg->sock);
-
- path = socket_address_from_strings(msg, msg->sock->backend_name,
- msg->path, 0);
- if (!path) {
- goto fail;
- }
-
- status = socket_listen(msg->sock, path, 50, 0);
- if (!NT_STATUS_IS_OK(status)) {
- DEBUG(0,("Unable to setup messaging listener for '%s':%s\n", msg->path, nt_errstr(status)));
- goto fail;
- }
-
- /* it needs to be non blocking for sends */
- set_blocking(socket_get_fd(msg->sock), false);
-
- msg->event.fde = tevent_add_fd(ev, msg, socket_get_fd(msg->sock),
- TEVENT_FD_READ, imessaging_handler, msg);
- tevent_fd_set_auto_close(msg->event.fde);
-
if (auto_remove) {
talloc_set_destructor(msg, imessaging_cleanup);
}
return NULL;
}
+static void imessaging_dgm_recv(const uint8_t *buf, size_t buf_len,
+ int *fds, size_t num_fds,
+ void *private_data)
+{
+ struct imessaging_context *msg = talloc_get_type_abort(
+ private_data, struct imessaging_context);
+ uint32_t msg_type;
+ struct server_id src, dst;
+ struct server_id_buf srcbuf, dstbuf;
+ DATA_BLOB data;
+
+ if (buf_len < MESSAGE_HDR_LENGTH) {
+ /* Invalid message, ignore */
+ return;
+ }
+
+ message_hdr_get(&msg_type, &src, &dst, buf);
+
+ data.data = discard_const_p(uint8_t, buf + MESSAGE_HDR_LENGTH);
+ data.length = buf_len - MESSAGE_HDR_LENGTH;
+
+ if ((cluster_id_equal(&dst, &msg->server_id)) ||
+ ((dst.task_id == 0) && (msg->server_id.pid == 0))) {
+ struct dispatch_fn *d, *next;
+
+ DEBUG(10, ("%s: dst %s matches my id: %s, type=0x%x\n",
+ __func__,
+ server_id_str_buf(dst, &dstbuf),
+ server_id_str_buf(msg->server_id, &srcbuf),
+ (unsigned)msg_type));
+
+ d = imessaging_find_dispatch(msg, msg_type);
+
+ for (; d; d = next) {
+ next = d->next;
+ d->fn(msg, d->private_data, d->msg_type, src, &data);
+ }
+ } else {
+ DEBUG(10, ("%s: Ignoring type=0x%x dst %s, I am %s, \n",
+ __func__, (unsigned)msg_type,
+ server_id_str_buf(dst, &dstbuf),
+ server_id_str_buf(msg->server_id, &srcbuf)));
+ }
+}
+
/*
A hack, for the short term until we get 'client only' messaging in place
*/