From: Volker Lendecke Date: Mon, 21 May 2007 22:17:13 +0000 (+0000) Subject: r23055: Rewrite messages.c to use auto-generated marshalling in the tdb. I'm X-Git-Tag: build_3.2.7_ctdb.53~4720 X-Git-Url: http://git.samba.org/?a=commitdiff_plain;h=eaefd00563173dfabb7716c5695ac0a2f7139bb6;p=obnox%2Fsamba%2Fsamba-obnox.git r23055: Rewrite messages.c to use auto-generated marshalling in the tdb. I'm doing this because for the clustering the marshalling is needed in more than one place, so I wanted a decent routine to marshall a message_rec struct which was not there before. Tridge, this seems about the same speed as it used to be before, the librpc/ndr overhead in my tests was under the noise. Volker --- diff --git a/source/Makefile.in b/source/Makefile.in index fb5ced6d1dc..41316ac67b3 100644 --- a/source/Makefile.in +++ b/source/Makefile.in @@ -149,7 +149,7 @@ PATH_FLAGS = -DSMB_PASSWD_FILE=\"$(SMB_PASSWD_FILE)\" \ -DLOCKDIR=\"$(LOCKDIR)\" \ -DPIDDIR=\"$(PIDDIR)\" \ -DLIBDIR=\"$(LIBDIR)\" \ - -DLOGFILEBASE=\"$(LOGFILEBASE)\" \ + -DLOGFILEBASE=\"$(LOGFILEBASE)\" \ -DSHLIBEXT=\"@SHLIBEXT@\" \ -DCONFIGDIR=\"$(CONFIGDIR)\" @@ -259,7 +259,9 @@ SOCKET_WRAPPER_OBJ = @SOCKET_WRAPPER_OBJS@ TALLOC_OBJ = lib/talloc/talloc.o -LIB_WITHOUT_PROTO_OBJ = $(LIBREPLACE_OBJ) $(SOCKET_WRAPPER_OBJ) $(TALLOC_OBJ) + +LIB_WITHOUT_PROTO_OBJ = $(LIBREPLACE_OBJ) $(SOCKET_WRAPPER_OBJ) $(TALLOC_OBJ) \ + lib/messages.o librpc/gen_ndr/ndr_messaging.o LIB_WITH_PROTO_OBJ = $(VERSION_OBJ) lib/charcnv.o lib/debug.o lib/fault.o \ lib/interface.o lib/md4.o \ @@ -273,7 +275,7 @@ LIB_WITH_PROTO_OBJ = $(VERSION_OBJ) lib/charcnv.o lib/debug.o lib/fault.o \ lib/util_unistr.o lib/util_file.o lib/data_blob.o \ lib/util.o lib/util_sock.o lib/sock_exec.o lib/util_sec.o \ lib/substitute.o lib/fsusage.o \ - lib/ms_fnmatch.o lib/select.o lib/messages.o \ + lib/ms_fnmatch.o lib/select.o \ lib/tallocmsg.o lib/dmallocmsg.o libsmb/smb_signing.o \ lib/md5.o lib/hmacmd5.o lib/arc4.o lib/iconv.o \ nsswitch/wb_client.o $(WBCOMMON_OBJ) \ @@ -281,7 +283,7 @@ LIB_WITH_PROTO_OBJ = $(VERSION_OBJ) lib/charcnv.o lib/debug.o lib/fault.o \ lib/adt_tree.o lib/gencache.o $(TDB_OBJ) \ lib/module.o lib/events.o lib/ldap_escape.o @CHARSET_STATIC@ \ lib/secdesc.o lib/util_seaccess.o lib/secace.o lib/secacl.o \ - libads/krb5_errs.o lib/system_smbd.o lib/audit.o + libads/krb5_errs.o lib/system_smbd.o lib/audit.o $(LIBNDR_OBJ) LIB_OBJ = $(LIB_WITHOUT_PROTO_OBJ) $(LIB_WITH_PROTO_OBJ) @@ -347,7 +349,7 @@ LIBMSRPC_GEN_OBJ = librpc/gen_ndr/cli_unixinfo.o librpc/gen_ndr/cli_lsa.o \ librpc/gen_ndr/cli_srvsvc.o \ librpc/gen_ndr/cli_winreg.o librpc/gen_ndr/cli_initshutdown.o \ librpc/gen_ndr/cli_eventlog.o librpc/gen_ndr/cli_epmapper.o \ - $(LIBNDR_GEN_OBJ) $(LIBNDR_OBJ) $(RPCCLIENT_NDR_OBJ) + $(LIBNDR_GEN_OBJ) $(RPCCLIENT_NDR_OBJ) REGOBJS_OBJ = registry/reg_objects.o @@ -763,7 +765,7 @@ REPLACETORT_OBJ = lib/replace/test/testsuite.o \ $(LIBREPLACE_OBJ) NDRDUMP_OBJ = librpc/tools/ndrdump.o \ - $(PARAM_OBJ) $(LIBNDR_OBJ) $(LIBNDR_GEN_OBJ) \ + $(PARAM_OBJ) $(LIBNDR_GEN_OBJ) \ $(LIBSAMBA_OBJ) $(LIB_NONSMBD_OBJ) $(POPT_LIB_OBJ) \ $(RPC_PARSE_OBJ1) $(DOSERR_OBJ) $(SECRETS_OBJ) @@ -783,7 +785,7 @@ SMBFILTER_OBJ = utils/smbfilter.o $(PARAM_OBJ) $(LIBSMB_OBJ) $(SECRETS_OBJ) \ $(LIB_NONSMBD_OBJ) $(KRBCLIENT_OBJ) PROTO_OBJ = $(SMBD_OBJ_MAIN) $(LIBNDR_OBJ) $(RPCCLIENT_NDR_OBJ) \ - $(LIBNDR_GEN_OBJ) $(SMBD_OBJ_SRV) $(NMBD_OBJ1) $(LIBSMB_OBJ) \ + $(LIBNDR_GEN_OBJ) $(SMBD_OBJ_SRV) $(NMBD_OBJ1) $(LIBSMB_OBJ) \ $(SMBTORTURE_OBJ1) $(RPCCLIENT_OBJ1) \ $(LIBMSRPC_OBJ) \ $(LIB_WITH_PROTO_OBJ) \ diff --git a/source/include/includes.h b/source/include/includes.h index 1fa3aae8f79..bbce3bdb5c8 100644 --- a/source/include/includes.h +++ b/source/include/includes.h @@ -648,13 +648,13 @@ typedef int BOOL; #include "trans2.h" #include "nterr.h" #include "ntioctl.h" -#include "messages.h" #include "charset.h" #include "dynconfig.h" #include "util_getent.h" #include "debugparse.h" #include "version.h" #include "privileges.h" +#include "messages.h" #include "locking.h" #include "smb.h" #include "ads_cldap.h" diff --git a/source/include/messages.h b/source/include/messages.h index e3f29832add..6e4cf130986 100644 --- a/source/include/messages.h +++ b/source/include/messages.h @@ -100,4 +100,37 @@ struct server_id { pid_t pid; }; +struct messaging_context; +struct data_blob; + +unsigned int messages_pending_for_pid(struct messaging_context *msg_ctx, + struct server_id pid); +void message_dispatch(struct messaging_context *msg_ctx); +BOOL message_send_all(struct messaging_context *msg_ctx, + int msg_type, + const void *buf, size_t len, + int *n_sent); +void message_block(void); +void message_unblock(void); +struct event_context *messaging_event_context(struct messaging_context *msg_ctx); +struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, + struct server_id server_id, + struct event_context *ev); +NTSTATUS messaging_register(struct messaging_context *msg_ctx, + void *private_data, + uint32_t msg_type, + void (*fn)(struct messaging_context *msg, + void *private_data, + uint32_t msg_type, + struct server_id server_id, + struct data_blob *data)); +void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type, + void *private_data); +NTSTATUS messaging_send(struct messaging_context *msg_ctx, + struct server_id server, + uint32_t msg_type, const struct data_blob *data); +NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx, + struct server_id server, uint32_t msg_type, + const uint8 *buf, size_t len); + #endif diff --git a/source/include/smb.h b/source/include/smb.h index 1a5b43e564b..a7d63a6aeda 100644 --- a/source/include/smb.h +++ b/source/include/smb.h @@ -425,7 +425,6 @@ struct fd_handle { unsigned long file_id; }; -struct messaging_context; struct event_context; struct fd_event; struct timed_event; @@ -532,10 +531,10 @@ typedef struct files_struct { #include "sysquotas.h" /* used to hold an arbitrary blob of data */ -typedef struct data_blob_ { +typedef struct data_blob { uint8 *data; size_t length; - void (*free)(struct data_blob_ *data_blob); + void (*free)(struct data_blob *data_blob); } DATA_BLOB; extern const DATA_BLOB data_blob_null; diff --git a/source/lib/messages.c b/source/lib/messages.c index 95f4aba4e78..6932369b217 100644 --- a/source/lib/messages.c +++ b/source/lib/messages.c @@ -47,54 +47,30 @@ */ #include "includes.h" +#include "librpc/gen_ndr/messaging.h" +#include "librpc/gen_ndr/ndr_messaging.h" /* the locking database handle */ -static TDB_CONTEXT *tdb; static int received_signal; /* change the message version with any incompatible changes in the protocol */ -#define MESSAGE_VERSION 1 +#define MESSAGE_VERSION 2 -struct message_rec { - int msg_version; - int msg_type; - struct server_id dest; - struct server_id src; - size_t len; -}; - -/* we have a linked list of dispatch handlers */ -static struct dispatch_fns { - struct dispatch_fns *next, *prev; - int msg_type; - void (*fn)(int msg_type, struct server_id pid, void *buf, size_t len, - void *private_data); +struct messaging_callback { + struct messaging_callback *prev, *next; + uint32 msg_type; + void (*fn)(struct messaging_context *msg, void *private_data, + uint32_t msg_type, + struct server_id server_id, DATA_BLOB *data); void *private_data; -} *dispatch_fns; - -static void message_register(int msg_type, - void (*fn)(int msg_type, struct server_id pid, - void *buf, size_t len, - void *private_data), - void *private_data); - -/**************************************************************************** - Free global objects. -****************************************************************************/ +}; -void gfree_messages(void) -{ - struct dispatch_fns *dfn, *next; - - /* delete the dispatch_fns list */ - dfn = dispatch_fns; - while( dfn ) { - next = dfn->next; - DLIST_REMOVE(dispatch_fns, dfn); - SAFE_FREE(dfn); - dfn = next; - } -} +struct messaging_context { + TDB_CONTEXT *tdb; + struct server_id id; + struct event_context *event_ctx; + struct messaging_callback *callbacks; +}; /**************************************************************************** Notifications come in as signals. @@ -106,21 +82,25 @@ static void sig_usr1(void) sys_select_signal(SIGUSR1); } -static NTSTATUS message_send_pid(struct server_id pid, int msg_type, - const void *buf, size_t len); +static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb, + struct server_id pid, int msg_type, + const void *buf, size_t len); /**************************************************************************** A useful function for testing the message system. ****************************************************************************/ -static void ping_message(int msg_type, struct server_id src, - void *buf, size_t len, void *private_data) +static void ping_message(struct messaging_context *msg_ctx, + void *private_data, + uint32_t msg_type, + struct server_id src, + DATA_BLOB *data) { - const char *msg = buf ? (const char *)buf : "none"; + const char *msg = data->data ? (const char *)data->data : "none"; DEBUG(1,("INFO: Received PING message from PID %s [%s]\n", procid_str_static(&src), msg)); - message_send_pid(src, MSG_PONG, buf, len); + messaging_send(msg_ctx, src, MSG_PONG, data); } /**************************************************************************** @@ -131,24 +111,21 @@ static BOOL message_init(struct messaging_context *msg_ctx) { sec_init(); - if (tdb) - return True; - - tdb = tdb_open_log(lock_path("messages.tdb"), - 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, - O_RDWR|O_CREAT,0600); + msg_ctx->tdb = tdb_open_log(lock_path("messages.tdb"), + 0, TDB_CLEAR_IF_FIRST|TDB_DEFAULT, + O_RDWR|O_CREAT,0600); - if (!tdb) { + if (!msg_ctx->tdb) { DEBUG(0,("ERROR: Failed to initialise messages database\n")); return False; } /* Activate the per-hashchain freelist */ - tdb_set_max_dead(tdb, 5); + tdb_set_max_dead(msg_ctx->tdb, 5); CatchSignal(SIGUSR1, SIGNAL_CAST sig_usr1); - message_register(MSG_PING, ping_message, NULL); + messaging_register(msg_ctx, NULL, MSG_PING, ping_message); /* Register some debugging related messages */ @@ -175,6 +152,99 @@ static TDB_DATA message_key_pid(struct server_id pid) return kbuf; } +/* + Fetch the messaging array for a process + */ + +static NTSTATUS messaging_tdb_fetch(TDB_CONTEXT *msg_tdb, + TDB_DATA key, + TALLOC_CTX *mem_ctx, + struct messaging_array **presult) +{ + struct messaging_array *result; + TDB_DATA data; + DATA_BLOB blob; + NTSTATUS status; + + if (!(result = TALLOC_ZERO_P(mem_ctx, struct messaging_array))) { + return NT_STATUS_NO_MEMORY; + } + + data = tdb_fetch(msg_tdb, key); + + if (data.dptr == NULL) { + *presult = result; + return NT_STATUS_OK; + } + + blob = data_blob_const(data.dptr, data.dsize); + + status = ndr_pull_struct_blob( + &blob, result, result, + (ndr_pull_flags_fn_t)ndr_pull_messaging_array); + + SAFE_FREE(data.dptr); + + if (!NT_STATUS_IS_OK(status)) { + TALLOC_FREE(result); + return status; + } + + if (DEBUGLEVEL >= 10) { + DEBUG(10, ("messaging_tdb_fetch:\n")); + NDR_PRINT_DEBUG(messaging_array, result); + } + + *presult = result; + return NT_STATUS_OK; +} + +/* + Store a messaging array for a pid +*/ + +static NTSTATUS messaging_tdb_store(TDB_CONTEXT *msg_tdb, + TDB_DATA key, + struct messaging_array *array) +{ + TDB_DATA data; + DATA_BLOB blob; + NTSTATUS status; + TALLOC_CTX *mem_ctx; + int ret; + + if (array->num_messages == 0) { + tdb_delete(msg_tdb, key); + return NT_STATUS_OK; + } + + if (!(mem_ctx = talloc_new(array))) { + return NT_STATUS_NO_MEMORY; + } + + status = ndr_push_struct_blob( + &blob, mem_ctx, array, + (ndr_push_flags_fn_t)ndr_push_messaging_array); + + if (!NT_STATUS_IS_OK(status)) { + talloc_free(mem_ctx); + return status; + } + + if (DEBUGLEVEL >= 10) { + DEBUG(10, ("messaging_tdb_store:\n")); + NDR_PRINT_DEBUG(messaging_array, array); + } + + data.dptr = blob.data; + data.dsize = blob.length; + + ret = tdb_store(msg_tdb, key, data, TDB_REPLACE); + TALLOC_FREE(mem_ctx); + + return (ret == 0) ? NT_STATUS_OK : NT_STATUS_INTERNAL_DB_CORRUPTION; +} + /**************************************************************************** Notify a process that it has a message. If the process doesn't exist then delete its record in the database. @@ -216,17 +286,6 @@ static NTSTATUS message_notify(struct server_id procid) * Something has gone wrong */ - if (errno == ESRCH) { - DEBUG(2,("pid %d doesn't exist - deleting messages record\n", - (int)pid)); - tdb_delete(tdb, message_key_pid(procid)); - - /* - * INVALID_HANDLE is the closest I can think of -- vl - */ - return NT_STATUS_INVALID_HANDLE; - } - DEBUG(2,("message to process %d failed - %s\n", (int)pid, strerror(errno))); @@ -235,6 +294,7 @@ static NTSTATUS message_notify(struct server_id procid) * errormap.o into lots of utils. */ + if (errno == ESRCH) return NT_STATUS_INVALID_HANDLE; if (errno == EINVAL) return NT_STATUS_INVALID_PARAMETER; if (errno == EPERM) return NT_STATUS_ACCESS_DENIED; return NT_STATUS_UNSUCCESSFUL; @@ -244,12 +304,15 @@ static NTSTATUS message_notify(struct server_id procid) Send a message to a particular pid. ****************************************************************************/ -static NTSTATUS message_send_pid(struct server_id pid, int msg_type, - const void *buf, size_t len) +static NTSTATUS messaging_tdb_send(TDB_CONTEXT *msg_tdb, + struct server_id pid, int msg_type, + const void *buf, size_t len) { - TDB_DATA dbuf; - struct message_rec rec; - int ret; + struct messaging_array *msg_array; + struct messaging_rec *rec; + TALLOC_CTX *mem_ctx; + NTSTATUS status; + TDB_DATA key = message_key_pid(pid); /* NULL pointer means implicit length zero. */ if (!buf) { @@ -263,138 +326,129 @@ static NTSTATUS message_send_pid(struct server_id pid, int msg_type, SMB_ASSERT(procid_to_pid(&pid) > 0); - rec.msg_version = MESSAGE_VERSION; - rec.msg_type = msg_type; - rec.dest = pid; - rec.src = procid_self(); - rec.len = buf ? len : 0; + if (!(mem_ctx = talloc_init("message_send_pid"))) { + return NT_STATUS_NO_MEMORY; + } + + if (tdb_chainlock(msg_tdb, key) == -1) { + return NT_STATUS_LOCK_NOT_GRANTED; + } + + status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &msg_array); + + if (!NT_STATUS_IS_OK(status)) { + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); + return status; + } - dbuf.dptr = (uint8 *)SMB_MALLOC(len + sizeof(rec)); - if (!dbuf.dptr) { + if (!(rec = TALLOC_REALLOC_ARRAY(mem_ctx, msg_array->messages, + struct messaging_rec, + msg_array->num_messages+1))) { + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); return NT_STATUS_NO_MEMORY; } - memcpy(dbuf.dptr, &rec, sizeof(rec)); - if (len > 0 && buf) - memcpy((void *)((char*)dbuf.dptr+sizeof(rec)), buf, len); + rec[msg_array->num_messages].msg_version = MESSAGE_VERSION; + rec[msg_array->num_messages].msg_type = msg_type; + rec[msg_array->num_messages].dest = pid; + rec[msg_array->num_messages].src = procid_self(); + rec[msg_array->num_messages].buf = data_blob_const(buf, len); - dbuf.dsize = len + sizeof(rec); + msg_array->messages = rec; + msg_array->num_messages += 1; - ret = tdb_append(tdb, message_key_pid(pid), dbuf); + status = messaging_tdb_store(msg_tdb, key, msg_array); - SAFE_FREE(dbuf.dptr); + tdb_chainunlock(msg_tdb, key); + TALLOC_FREE(mem_ctx); + + if (!NT_STATUS_IS_OK(status)) { + return status; + } + + status = message_notify(pid); - if (ret == -1) { - return NT_STATUS_INTERNAL_ERROR; + if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) { + DEBUG(2, ("pid %s doesn't exist - deleting messages record\n", + procid_str_static(&pid))); + tdb_delete(msg_tdb, message_key_pid(pid)); } - errno = 0; /* paranoia */ - return message_notify(pid); + return status; } /**************************************************************************** Count the messages pending for a particular pid. Expensive.... ****************************************************************************/ -unsigned int messages_pending_for_pid(struct server_id pid) +unsigned int messages_pending_for_pid(struct messaging_context *msg_ctx, + struct server_id pid) { - TDB_DATA dbuf; - uint8 *buf; - unsigned int message_count = 0; + struct messaging_array *msg_array; + unsigned int result; - dbuf = tdb_fetch(tdb, message_key_pid(pid)); - if (dbuf.dptr == NULL || dbuf.dsize == 0) { - SAFE_FREE(dbuf.dptr); + if (!NT_STATUS_IS_OK(messaging_tdb_fetch(msg_ctx->tdb, + message_key_pid(pid), NULL, + &msg_array))) { + DEBUG(10, ("messaging_tdb_fetch failed\n")); return 0; } - for (buf = dbuf.dptr; dbuf.dsize > sizeof(struct message_rec);) { - struct message_rec rec; - memcpy(&rec, buf, sizeof(rec)); - buf += (sizeof(rec) + rec.len); - dbuf.dsize -= (sizeof(rec) + rec.len); - message_count++; - } - - SAFE_FREE(dbuf.dptr); - return message_count; -} + result = msg_array->num_messages; + TALLOC_FREE(msg_array); + return result; +} /**************************************************************************** Retrieve all messages for the current process. ****************************************************************************/ -static BOOL retrieve_all_messages(char **msgs_buf, size_t *total_len) +static NTSTATUS retrieve_all_messages(TDB_CONTEXT *msg_tdb, + TALLOC_CTX *mem_ctx, + struct messaging_array **presult) { - TDB_DATA kbuf; - TDB_DATA dbuf; - TDB_DATA null_dbuf; - - ZERO_STRUCT(null_dbuf); - - *msgs_buf = NULL; - *total_len = 0; + struct messaging_array *result; + TDB_DATA key = message_key_pid(procid_self()); + NTSTATUS status; - kbuf = message_key_pid(procid_self()); + if (tdb_chainlock(msg_tdb, key) == -1) { + return NT_STATUS_LOCK_NOT_GRANTED; + } - if (tdb_chainlock(tdb, kbuf) == -1) - return False; + status = messaging_tdb_fetch(msg_tdb, key, mem_ctx, &result); - dbuf = tdb_fetch(tdb, kbuf); /* - * Replace with an empty record to keep the allocated - * space in the tdb. + * We delete the record here, tdb_set_max_dead keeps it around */ - tdb_store(tdb, kbuf, null_dbuf, TDB_REPLACE); - tdb_chainunlock(tdb, kbuf); + tdb_delete(msg_tdb, key); + tdb_chainunlock(msg_tdb, key); - if (dbuf.dptr == NULL || dbuf.dsize == 0) { - SAFE_FREE(dbuf.dptr); - return False; + if (NT_STATUS_IS_OK(status)) { + *presult = result; } - *msgs_buf = (char *)dbuf.dptr; - *total_len = dbuf.dsize; - - return True; + return status; } -/**************************************************************************** - Parse out the next message for the current process. -****************************************************************************/ - -static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, - struct server_id *src, char **buf, size_t *len) +/* + Dispatch one messsaging_rec +*/ +static void messaging_dispatch_rec(struct messaging_context *msg_ctx, + struct messaging_rec *rec) { - struct message_rec rec; - char *ret_buf = *buf; - - *buf = NULL; - *len = 0; - - if (total_len - (ret_buf - msgs_buf) < sizeof(rec)) - return False; - - memcpy(&rec, ret_buf, sizeof(rec)); - ret_buf += sizeof(rec); - - if (rec.msg_version != MESSAGE_VERSION) { - DEBUG(0,("message version %d received (expected %d)\n", - rec.msg_version, MESSAGE_VERSION)); - return False; - } + struct messaging_callback *cb, *next; - if (rec.len > 0) { - if (total_len - (ret_buf - msgs_buf) < rec.len) - return False; + for (cb = msg_ctx->callbacks; cb != NULL; cb = next) { + next = cb->next; + if (cb->msg_type == rec->msg_type) { + cb->fn(msg_ctx, cb->private_data, rec->msg_type, + rec->src, &rec->buf); + return; + } } - - *len = rec.len; - *msg_type = rec.msg_type; - *src = rec.src; - *buf = ret_buf; - - return True; + return; } /**************************************************************************** @@ -404,14 +458,10 @@ static BOOL message_recv(char *msgs_buf, size_t total_len, int *msg_type, messages on an *odd* byte boundary. ****************************************************************************/ -void message_dispatch(void) +void message_dispatch(struct messaging_context *msg_ctx) { - int msg_type; - struct server_id src; - char *buf; - char *msgs_buf; - size_t len, total_len; - int n_handled; + struct messaging_array *msg_array = NULL; + uint32 i; if (!received_signal) return; @@ -421,37 +471,16 @@ void message_dispatch(void) received_signal = 0; - if (!retrieve_all_messages(&msgs_buf, &total_len)) + if (!NT_STATUS_IS_OK(retrieve_all_messages(msg_ctx->tdb, NULL, + &msg_array))) { return; + } - for (buf = msgs_buf; - message_recv(msgs_buf, total_len, &msg_type, &src, &buf, &len); - buf += len) { - struct dispatch_fns *dfn; - - DEBUG(10,("message_dispatch: received msg_type=%d " - "src_pid=%u\n", msg_type, - (unsigned int) procid_to_pid(&src))); - - n_handled = 0; - for (dfn = dispatch_fns; dfn; dfn = dfn->next) { - if (dfn->msg_type == msg_type) { - DEBUG(10,("message_dispatch: processing " - "message of type %d.\n", msg_type)); - dfn->fn(msg_type, src, - len ? (void *)buf : NULL, len, - dfn->private_data); - n_handled++; - break; - } - } - if (!n_handled) { - DEBUG(5,("message_dispatch: warning: no handler " - "registed for msg_type %d in pid %u\n", - msg_type, (unsigned int)sys_getpid())); - } + for (i=0; inum_messages; i++) { + messaging_dispatch_rec(msg_ctx, &msg_array->messages[i]); } - SAFE_FREE(msgs_buf); + + TALLOC_FREE(msg_array); } /**************************************************************************** @@ -461,60 +490,12 @@ void message_dispatch(void) messages on an *odd* byte boundary. ****************************************************************************/ -static void message_register(int msg_type, - void (*fn)(int msg_type, struct server_id pid, - void *buf, size_t len, - void *private_data), - void *private_data) -{ - struct dispatch_fns *dfn; - - for (dfn = dispatch_fns; dfn; dfn = dfn->next) { - if (dfn->msg_type == msg_type) { - dfn->fn = fn; - return; - } - } - - if (!(dfn = SMB_MALLOC_P(struct dispatch_fns))) { - DEBUG(0,("message_register: Not enough memory. malloc " - "failed!\n")); - return; - } - - ZERO_STRUCTPN(dfn); - - dfn->msg_type = msg_type; - dfn->fn = fn; - dfn->private_data = private_data; - - DLIST_ADD(dispatch_fns, dfn); -} - -/**************************************************************************** - De-register the function for a particular message type. -****************************************************************************/ - -static void message_deregister(int msg_type) -{ - struct dispatch_fns *dfn, *next; - - for (dfn = dispatch_fns; dfn; dfn = next) { - next = dfn->next; - if (dfn->msg_type == msg_type) { - DLIST_REMOVE(dispatch_fns, dfn); - SAFE_FREE(dfn); - return; - } - } -} - struct msg_all { + struct messaging_context *msg_ctx; int msg_type; uint32 msg_flag; const void *buf; size_t len; - BOOL duplicates; int n_sent; }; @@ -522,41 +503,44 @@ struct msg_all { Send one of the messages for the broadcast. ****************************************************************************/ -static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, - void *state) +static int traverse_fn(TDB_CONTEXT *the_tdb, + const struct connections_key *ckey, + const struct connections_data *crec, + void *private_data) { - struct connections_data crec; - struct msg_all *msg_all = (struct msg_all *)state; + struct msg_all *msg_all = (struct msg_all *)private_data; NTSTATUS status; - if (dbuf.dsize != sizeof(crec)) - return 0; - - memcpy(&crec, dbuf.dptr, sizeof(crec)); - - if (crec.cnum != -1) + if (crec->cnum != -1) return 0; /* Don't send if the receiver hasn't registered an interest. */ - if(!(crec.bcast_msg_flags & msg_all->msg_flag)) + if(!(crec->bcast_msg_flags & msg_all->msg_flag)) return 0; /* If the msg send fails because the pid was not found (i.e. smbd died), * the msg has already been deleted from the messages.tdb.*/ - status = message_send_pid(crec.pid, msg_all->msg_type, - msg_all->buf, msg_all->len); + status = messaging_send_buf(msg_all->msg_ctx, + crec->pid, msg_all->msg_type, + (uint8 *)msg_all->buf, msg_all->len); if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) { + + TDB_DATA key; /* If the pid was not found delete the entry from * connections.tdb */ DEBUG(2,("pid %s doesn't exist - deleting connections " - "%d [%s]\n", procid_str_static(&crec.pid), crec.cnum, - crec.servicename)); - tdb_delete(the_tdb, kbuf); + "%d [%s]\n", procid_str_static(&crec->pid), + crec->cnum, crec->servicename)); + + key.dptr = (uint8 *)ckey; + key.dsize = sizeof(*ckey); + + tdb_delete(the_tdb, key); } msg_all->n_sent++; return 0; @@ -577,7 +561,6 @@ static int traverse_fn(TDB_CONTEXT *the_tdb, TDB_DATA kbuf, TDB_DATA dbuf, BOOL message_send_all(struct messaging_context *msg_ctx, int msg_type, const void *buf, size_t len, - BOOL duplicates_allowed, int *n_sent) { struct msg_all msg_all; @@ -598,10 +581,10 @@ BOOL message_send_all(struct messaging_context *msg_ctx, msg_all.buf = buf; msg_all.len = len; - msg_all.duplicates = duplicates_allowed; msg_all.n_sent = 0; + msg_all.msg_ctx = msg_ctx; - connections_traverse(traverse_fn, &msg_all); + connections_forall(traverse_fn, &msg_all); if (n_sent) *n_sent = msg_all.n_sent; return True; @@ -622,40 +605,6 @@ void message_unblock(void) BlockSignals(False, SIGUSR1); } -/* - * Samba4 API wrapper around the Samba3 implementation. Yes, I know, we could - * import the whole Samba4 thing, but I want notify.c from Samba4 in first. - */ - -struct messaging_callback { - struct messaging_callback *prev, *next; - uint32 msg_type; - void (*fn)(struct messaging_context *msg, void *private_data, - uint32_t msg_type, - struct server_id server_id, DATA_BLOB *data); - void *private_data; -}; - -struct messaging_context { - struct server_id id; - struct event_context *event_ctx; - struct messaging_callback *callbacks; -}; - -static int messaging_context_destructor(struct messaging_context *ctx) -{ - struct messaging_callback *cb; - - for (cb = ctx->callbacks; cb; cb = cb->next) { - /* - * We unconditionally remove all instances of our callback - * from the tdb basis. - */ - message_deregister(cb->msg_type); - } - return 0; -} - struct event_context *messaging_event_context(struct messaging_context *msg_ctx) { return msg_ctx->event_ctx; @@ -673,7 +622,6 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, ctx->id = server_id; ctx->event_ctx = ev; - talloc_set_destructor(ctx, messaging_context_destructor); if (!message_init(ctx)) { DEBUG(0, ("message_init failed: %s\n", strerror(errno))); @@ -683,35 +631,12 @@ struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, return ctx; } -static void messaging_callback(int msg_type, struct server_id pid, - void *buf, size_t len, void *private_data) -{ - struct messaging_context *ctx = talloc_get_type_abort( - private_data, struct messaging_context); - struct messaging_callback *cb, *next; - - for (cb = ctx->callbacks; cb; cb = next) { - /* - * Allow a callback to remove itself - */ - next = cb->next; - - if (msg_type == cb->msg_type) { - DATA_BLOB blob; - - blob.data = (uint8 *)buf; - blob.length = len; - - cb->fn(ctx, cb->private_data, msg_type, pid, &blob); - } - } -} - /* * Register a dispatch function for a particular message type. Allow multiple * registrants */ -NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, +NTSTATUS messaging_register(struct messaging_context *msg_ctx, + void *private_data, uint32_t msg_type, void (*fn)(struct messaging_context *msg, void *private_data, @@ -721,7 +646,19 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, { struct messaging_callback *cb; - if (!(cb = talloc(ctx, struct messaging_callback))) { + /* + * Only one callback per type + */ + + for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) { + if (cb->msg_type == msg_type) { + cb->fn = fn; + cb->private_data = private_data; + return NT_STATUS_OK; + } + } + + if (!(cb = talloc(msg_ctx, struct messaging_callback))) { return NT_STATUS_NO_MEMORY; } @@ -729,8 +666,7 @@ NTSTATUS messaging_register(struct messaging_context *ctx, void *private_data, cb->fn = fn; cb->private_data = private_data; - DLIST_ADD(ctx->callbacks, cb); - message_register(msg_type, messaging_callback, ctx); + DLIST_ADD(msg_ctx->callbacks, cb); return NT_STATUS_OK; } @@ -759,7 +695,8 @@ NTSTATUS messaging_send(struct messaging_context *msg_ctx, struct server_id server, uint32_t msg_type, const DATA_BLOB *data) { - return message_send_pid(server, msg_type, data->data, data->length); + return messaging_tdb_send(msg_ctx->tdb, server, msg_type, + data->data, data->length); } NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx, diff --git a/source/lib/util.c b/source/lib/util.c index 4425c2e1cc2..36396d9f832 100644 --- a/source/lib/util.c +++ b/source/lib/util.c @@ -199,7 +199,6 @@ void gfree_all( void ) gfree_case_tables(); gfree_debugsyms(); gfree_charcnv(); - gfree_messages(); gfree_interfaces(); /* release the talloc null_context memory last */ diff --git a/source/librpc/gen_ndr/messaging.h b/source/librpc/gen_ndr/messaging.h new file mode 100644 index 00000000000..71340c09874 --- /dev/null +++ b/source/librpc/gen_ndr/messaging.h @@ -0,0 +1,21 @@ +/* header auto-generated by pidl */ + +#include + +#ifndef _HEADER_messaging +#define _HEADER_messaging + +struct messaging_rec { + uint32_t msg_version; + uint32_t msg_type; + struct server_id dest; + struct server_id src; + DATA_BLOB buf; +}/* [public] */; + +struct messaging_array { + uint32_t num_messages; + struct messaging_rec *messages; +}/* [public] */; + +#endif /* _HEADER_messaging */ diff --git a/source/librpc/gen_ndr/ndr_messaging.c b/source/librpc/gen_ndr/ndr_messaging.c new file mode 100644 index 00000000000..21aec98a989 --- /dev/null +++ b/source/librpc/gen_ndr/ndr_messaging.c @@ -0,0 +1,115 @@ +/* parser auto-generated by pidl */ + +#include "includes.h" +#include "librpc/gen_ndr/ndr_messaging.h" + +_PUBLIC_ NTSTATUS ndr_push_messaging_rec(struct ndr_push *ndr, int ndr_flags, const struct messaging_rec *r) +{ + if (ndr_flags & NDR_SCALARS) { + NDR_CHECK(ndr_push_align(ndr, 4)); + NDR_CHECK(ndr_push_uint32(ndr, NDR_SCALARS, r->msg_version)); + NDR_CHECK(ndr_push_uint32(ndr, NDR_SCALARS, r->msg_type)); + NDR_CHECK(ndr_push_server_id(ndr, NDR_SCALARS, &r->dest)); + NDR_CHECK(ndr_push_server_id(ndr, NDR_SCALARS, &r->src)); + NDR_CHECK(ndr_push_DATA_BLOB(ndr, NDR_SCALARS, r->buf)); + } + if (ndr_flags & NDR_BUFFERS) { + NDR_CHECK(ndr_push_server_id(ndr, NDR_BUFFERS, &r->dest)); + NDR_CHECK(ndr_push_server_id(ndr, NDR_BUFFERS, &r->src)); + } + return NT_STATUS_OK; +} + +_PUBLIC_ NTSTATUS ndr_pull_messaging_rec(struct ndr_pull *ndr, int ndr_flags, struct messaging_rec *r) +{ + if (ndr_flags & NDR_SCALARS) { + NDR_CHECK(ndr_pull_align(ndr, 4)); + NDR_CHECK(ndr_pull_uint32(ndr, NDR_SCALARS, &r->msg_version)); + NDR_CHECK(ndr_pull_uint32(ndr, NDR_SCALARS, &r->msg_type)); + NDR_CHECK(ndr_pull_server_id(ndr, NDR_SCALARS, &r->dest)); + NDR_CHECK(ndr_pull_server_id(ndr, NDR_SCALARS, &r->src)); + NDR_CHECK(ndr_pull_DATA_BLOB(ndr, NDR_SCALARS, &r->buf)); + } + if (ndr_flags & NDR_BUFFERS) { + NDR_CHECK(ndr_pull_server_id(ndr, NDR_BUFFERS, &r->dest)); + NDR_CHECK(ndr_pull_server_id(ndr, NDR_BUFFERS, &r->src)); + } + return NT_STATUS_OK; +} + +_PUBLIC_ void ndr_print_messaging_rec(struct ndr_print *ndr, const char *name, const struct messaging_rec *r) +{ + ndr_print_struct(ndr, name, "messaging_rec"); + ndr->depth++; + ndr_print_uint32(ndr, "msg_version", r->msg_version); + ndr_print_uint32(ndr, "msg_type", r->msg_type); + ndr_print_server_id(ndr, "dest", &r->dest); + ndr_print_server_id(ndr, "src", &r->src); + ndr_print_DATA_BLOB(ndr, "buf", r->buf); + ndr->depth--; +} + +_PUBLIC_ NTSTATUS ndr_push_messaging_array(struct ndr_push *ndr, int ndr_flags, const struct messaging_array *r) +{ + uint32_t cntr_messages_0; + if (ndr_flags & NDR_SCALARS) { + NDR_CHECK(ndr_push_align(ndr, 4)); + NDR_CHECK(ndr_push_uint32(ndr, NDR_SCALARS, r->num_messages)); + for (cntr_messages_0 = 0; cntr_messages_0 < r->num_messages; cntr_messages_0++) { + NDR_CHECK(ndr_push_messaging_rec(ndr, NDR_SCALARS, &r->messages[cntr_messages_0])); + } + } + if (ndr_flags & NDR_BUFFERS) { + for (cntr_messages_0 = 0; cntr_messages_0 < r->num_messages; cntr_messages_0++) { + NDR_CHECK(ndr_push_messaging_rec(ndr, NDR_BUFFERS, &r->messages[cntr_messages_0])); + } + } + return NT_STATUS_OK; +} + +_PUBLIC_ NTSTATUS ndr_pull_messaging_array(struct ndr_pull *ndr, int ndr_flags, struct messaging_array *r) +{ + uint32_t cntr_messages_0; + TALLOC_CTX *_mem_save_messages_0; + if (ndr_flags & NDR_SCALARS) { + NDR_CHECK(ndr_pull_align(ndr, 4)); + NDR_CHECK(ndr_pull_uint32(ndr, NDR_SCALARS, &r->num_messages)); + NDR_PULL_ALLOC_N(ndr, r->messages, r->num_messages); + _mem_save_messages_0 = NDR_PULL_GET_MEM_CTX(ndr); + NDR_PULL_SET_MEM_CTX(ndr, r->messages, 0); + for (cntr_messages_0 = 0; cntr_messages_0 < r->num_messages; cntr_messages_0++) { + NDR_CHECK(ndr_pull_messaging_rec(ndr, NDR_SCALARS, &r->messages[cntr_messages_0])); + } + NDR_PULL_SET_MEM_CTX(ndr, _mem_save_messages_0, 0); + } + if (ndr_flags & NDR_BUFFERS) { + _mem_save_messages_0 = NDR_PULL_GET_MEM_CTX(ndr); + NDR_PULL_SET_MEM_CTX(ndr, r->messages, 0); + for (cntr_messages_0 = 0; cntr_messages_0 < r->num_messages; cntr_messages_0++) { + NDR_CHECK(ndr_pull_messaging_rec(ndr, NDR_BUFFERS, &r->messages[cntr_messages_0])); + } + NDR_PULL_SET_MEM_CTX(ndr, _mem_save_messages_0, 0); + } + return NT_STATUS_OK; +} + +_PUBLIC_ void ndr_print_messaging_array(struct ndr_print *ndr, const char *name, const struct messaging_array *r) +{ + uint32_t cntr_messages_0; + ndr_print_struct(ndr, name, "messaging_array"); + ndr->depth++; + ndr_print_uint32(ndr, "num_messages", r->num_messages); + ndr->print(ndr, "%s: ARRAY(%d)", "messages", r->num_messages); + ndr->depth++; + for (cntr_messages_0=0;cntr_messages_0num_messages;cntr_messages_0++) { + char *idx_0=NULL; + asprintf(&idx_0, "[%d]", cntr_messages_0); + if (idx_0) { + ndr_print_messaging_rec(ndr, "messages", &r->messages[cntr_messages_0]); + free(idx_0); + } + } + ndr->depth--; + ndr->depth--; +} + diff --git a/source/librpc/gen_ndr/ndr_messaging.h b/source/librpc/gen_ndr/ndr_messaging.h new file mode 100644 index 00000000000..c1093c76fe1 --- /dev/null +++ b/source/librpc/gen_ndr/ndr_messaging.h @@ -0,0 +1,16 @@ +/* header auto-generated by pidl */ + +#include "librpc/gen_ndr/messaging.h" + +#ifndef _HEADER_NDR_messaging +#define _HEADER_NDR_messaging + +#include "librpc/ndr/libndr.h" +#define DCERPC_MESSAGING_CALL_COUNT (0) +NTSTATUS ndr_push_messaging_rec(struct ndr_push *ndr, int ndr_flags, const struct messaging_rec *r); +NTSTATUS ndr_pull_messaging_rec(struct ndr_pull *ndr, int ndr_flags, struct messaging_rec *r); +void ndr_print_messaging_rec(struct ndr_print *ndr, const char *name, const struct messaging_rec *r); +NTSTATUS ndr_push_messaging_array(struct ndr_push *ndr, int ndr_flags, const struct messaging_array *r); +NTSTATUS ndr_pull_messaging_array(struct ndr_pull *ndr, int ndr_flags, struct messaging_array *r); +void ndr_print_messaging_array(struct ndr_print *ndr, const char *name, const struct messaging_array *r); +#endif /* _HEADER_NDR_messaging */ diff --git a/source/librpc/idl/messaging.idl b/source/librpc/idl/messaging.idl new file mode 100644 index 00000000000..b1158ca189b --- /dev/null +++ b/source/librpc/idl/messaging.idl @@ -0,0 +1,26 @@ +#include "idl_types.h" + +/* + IDL structures for messaging code +*/ + +[ + pointer_default(unique) +] +interface messaging +{ + /* messaging struct sent across the sockets and stored in the tdb */ + + typedef [public] struct { + uint32 msg_version; + uint32 msg_type; + server_id dest; + server_id src; + DATA_BLOB buf; + } messaging_rec; + + typedef [public] struct { + uint32 num_messages; + messaging_rec messages[num_messages]; + } messaging_array; +} diff --git a/source/nmbd/nmbd.c b/source/nmbd/nmbd.c index 405aed3428c..8c94ad842b1 100644 --- a/source/nmbd/nmbd.c +++ b/source/nmbd/nmbd.c @@ -376,7 +376,7 @@ static void process(void) /* Check for internal messages */ - message_dispatch(); + message_dispatch(nmbd_messaging_context()); /* * Check all broadcast subnets to see if diff --git a/source/nmbd/nmbd_processlogon.c b/source/nmbd/nmbd_processlogon.c index 0d869f1586b..287f3e8897d 100644 --- a/source/nmbd/nmbd_processlogon.c +++ b/source/nmbd/nmbd_processlogon.c @@ -41,7 +41,7 @@ static void send_repl_message(uint32 low_serial) DEBUG(3, ("sending replication message, serial = 0x%04x\n", low_serial)); message_send_all(nmbd_messaging_context(), MSG_SMB_SAM_REPL, - &low_serial, sizeof(low_serial), False, NULL); + &low_serial, sizeof(low_serial), NULL); } /**************************************************************************** diff --git a/source/nsswitch/winbindd.c b/source/nsswitch/winbindd.c index ed0bdeb13ad..8e6d2d0bd2d 100644 --- a/source/nsswitch/winbindd.c +++ b/source/nsswitch/winbindd.c @@ -744,7 +744,7 @@ static int process_loop(int listen_sock, int listen_priv_sock) /* Handle messages */ - message_dispatch(); + message_dispatch(winbind_messaging_context()); run_events(winbind_event_context(), 0, NULL, NULL); diff --git a/source/nsswitch/winbindd_dual.c b/source/nsswitch/winbindd_dual.c index e89ae771c12..2e8ad154db3 100644 --- a/source/nsswitch/winbindd_dual.c +++ b/source/nsswitch/winbindd_dual.c @@ -973,7 +973,7 @@ static BOOL fork_domain_child(struct winbindd_child *child) /* Handle messages */ - message_dispatch(); + message_dispatch(winbind_messaging_context()); FD_ZERO(&read_fds); FD_SET(state.sock, &read_fds); diff --git a/source/printing/notify.c b/source/printing/notify.c index 37ae0037833..23cdcc3361e 100644 --- a/source/printing/notify.c +++ b/source/printing/notify.c @@ -183,7 +183,8 @@ static void print_notify_send_messages_to_printer(struct messaging_context *msg_ } for (i = 0; i < num_pids; i++) { - unsigned int q_len = messages_pending_for_pid(pid_to_procid(pid_list[i])); + unsigned int q_len = messages_pending_for_pid( + msg_ctx, pid_to_procid(pid_list[i])); if (q_len > 1000) { DEBUG(5, ("print_notify_send_messages_to_printer: discarding notify to printer %s as queue length = %u\n", printer, q_len )); diff --git a/source/printing/printing.c b/source/printing/printing.c index 6717f473cc5..6101f9a0f59 100644 --- a/source/printing/printing.c +++ b/source/printing/printing.c @@ -1426,7 +1426,7 @@ void start_background_queue(void) /* now check for messages */ DEBUG(10,("start_background_queue: background LPQ thread got a message\n")); - message_dispatch(); + message_dispatch(smbd_messaging_context()); /* process any pending print change notify messages */ diff --git a/source/rpc_server/srv_netlog_nt.c b/source/rpc_server/srv_netlog_nt.c index 4dd04c02881..2bb872874b0 100644 --- a/source/rpc_server/srv_netlog_nt.c +++ b/source/rpc_server/srv_netlog_nt.c @@ -77,7 +77,7 @@ static void send_sync_message(void) { DEBUG(3, ("sending sam synchronisation message\n")); message_send_all(smbd_messaging_context(), MSG_SMB_SAM_SYNC, NULL, 0, - False, NULL); + NULL); } /************************************************************************* diff --git a/source/rpc_server/srv_spoolss_nt.c b/source/rpc_server/srv_spoolss_nt.c index 890d2e08855..2047e13df36 100644 --- a/source/rpc_server/srv_spoolss_nt.c +++ b/source/rpc_server/srv_spoolss_nt.c @@ -313,7 +313,7 @@ WERROR delete_printer_hook( NT_USER_TOKEN *token, const char *sharename ) if ( (ret = smbrun(command, NULL)) == 0 ) { /* Tell everyone we updated smb.conf. */ message_send_all(smbd_messaging_context(), - MSG_SMB_CONF_UPDATED, NULL, 0, False, NULL); + MSG_SMB_CONF_UPDATED, NULL, 0, NULL); } if ( is_print_op ) @@ -6269,7 +6269,7 @@ BOOL add_printer_hook(NT_USER_TOKEN *token, NT_PRINTER_INFO_LEVEL *printer) if ( (ret = smbrun(command, &fd)) == 0 ) { /* Tell everyone we updated smb.conf. */ message_send_all(smbd_messaging_context(), - MSG_SMB_CONF_UPDATED, NULL, 0, False, NULL); + MSG_SMB_CONF_UPDATED, NULL, 0, NULL); } if ( is_print_op ) diff --git a/source/rpc_server/srv_srvsvc_nt.c b/source/rpc_server/srv_srvsvc_nt.c index cf3268d44d9..df7cd06b67d 100644 --- a/source/rpc_server/srv_srvsvc_nt.c +++ b/source/rpc_server/srv_srvsvc_nt.c @@ -1423,7 +1423,7 @@ static WERROR add_share(const char *share_name, const char *path, /* Tell everyone we updated smb.conf. */ message_send_all(smbd_messaging_context(), MSG_SMB_CONF_UPDATED, - NULL, 0, False, NULL); + NULL, 0, NULL); } if ( is_disk_op ) @@ -1520,7 +1520,7 @@ static WERROR delete_share(const char *sharename, /* Tell everyone we updated smb.conf. */ message_send_all(smbd_messaging_context(), MSG_SMB_CONF_UPDATED, - NULL, 0, False, NULL); + NULL, 0, NULL); } if ( is_disk_op ) @@ -1579,7 +1579,7 @@ static WERROR change_share(const char *share_name, const char *path, /* Tell everyone we updated smb.conf. */ message_send_all(smbd_messaging_context(), MSG_SMB_CONF_UPDATED, - NULL, 0, False, NULL); + NULL, 0, NULL); } if ( is_disk_op ) diff --git a/source/smbd/lanman.c b/source/smbd/lanman.c index fae63120802..0a9a529a85f 100644 --- a/source/smbd/lanman.c +++ b/source/smbd/lanman.c @@ -1895,8 +1895,7 @@ static BOOL api_RNetShareAdd(connection_struct *conn,uint16 vuid, } else { SAFE_FREE(command); message_send_all(smbd_messaging_context(), - MSG_SMB_CONF_UPDATED, NULL, 0, - False, NULL); + MSG_SMB_CONF_UPDATED, NULL, 0, NULL); } } else { return False; diff --git a/source/smbd/process.c b/source/smbd/process.c index 02dc507e11e..3b922af51fe 100644 --- a/source/smbd/process.c +++ b/source/smbd/process.c @@ -380,7 +380,7 @@ static BOOL receive_message_or_smb(char *buffer, int buffer_len, int timeout) * messages as we need to synchronously process any messages * we may have sent to ourselves from the previous SMB. */ - message_dispatch(); + message_dispatch(smbd_messaging_context()); /* * Check to see if we already have a message on the deferred open queue diff --git a/source/smbd/server.c b/source/smbd/server.c index 7a139afa476..6746271c094 100644 --- a/source/smbd/server.c +++ b/source/smbd/server.c @@ -392,7 +392,7 @@ static BOOL open_sockets_smbd(enum smb_server_mode server_mode, const char *smb_ lp_TALLOC_FREE(); /* Ensure we respond to PING and DEBUG messages from the main smbd. */ - message_dispatch(); + message_dispatch(smbd_messaging_context()); if (got_sig_cld) { pid_t pid; diff --git a/source/smbd/statcache.c b/source/smbd/statcache.c index 1d6646257a9..b63dd165a76 100644 --- a/source/smbd/statcache.c +++ b/source/smbd/statcache.c @@ -296,7 +296,6 @@ void send_stat_cache_delete_message(const char *name) MSG_SMB_STAT_CACHE_DELETE, name, strlen(name)+1, - True, NULL); #endif } diff --git a/source/torture/msgtest.c b/source/torture/msgtest.c index 20e5230a60f..d8c280e69bf 100644 --- a/source/torture/msgtest.c +++ b/source/torture/msgtest.c @@ -75,7 +75,7 @@ static void pong_message(struct messaging_context *msg_ctx, } while (pong_count < i) { - message_dispatch(); + message_dispatch(msg_ctx); smb_msleep(1); } @@ -92,7 +92,7 @@ static void pong_message(struct messaging_context *msg_ctx, } for (i=0;i pong_count + 20) { - message_dispatch(); + message_dispatch(msg_ctx); } } printf("waiting for %d remaining replies (done %d)\n", (int)(ping_count - pong_count), pong_count); while (timeval_elapsed(&tv) < 30 && pong_count < ping_count) { - message_dispatch(); + message_dispatch(msg_ctx); } if (ping_count != pong_count) { diff --git a/source/utils/smbcontrol.c b/source/utils/smbcontrol.c index 637cdacffa4..4f388637ad9 100644 --- a/source/utils/smbcontrol.c +++ b/source/utils/smbcontrol.c @@ -49,8 +49,7 @@ static int num_replies; /* Used by message callback fns */ static BOOL send_message(struct messaging_context *msg_ctx, struct server_id pid, int msg_type, - const void *buf, int len, - BOOL duplicates) + const void *buf, int len) { BOOL ret; int n_sent = 0; @@ -60,8 +59,7 @@ static BOOL send_message(struct messaging_context *msg_ctx, messaging_send_buf(msg_ctx, pid, msg_type, (uint8 *)buf, len)); - ret = message_send_all(msg_ctx, msg_type, buf, len, duplicates, - &n_sent); + ret = message_send_all(msg_ctx, msg_type, buf, len, &n_sent); DEBUG(10,("smbcontrol/send_message: broadcast message to " "%d processes\n", n_sent)); @@ -79,7 +77,7 @@ static void wait_replies(struct messaging_context *msg_ctx, busy-wait here as there is no nicer way to do it. */ do { - message_dispatch(); + message_dispatch(msg_ctx); event_loop_once(messaging_event_context(msg_ctx)); if (num_replies > 0 && !multiple_replies) break; @@ -140,8 +138,8 @@ static BOOL do_debug(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_DEBUG, argv[1], strlen(argv[1]) + 1, False); + return send_message(msg_ctx, pid, MSG_DEBUG, argv[1], + strlen(argv[1]) + 1); } #if defined(HAVE_LIBUNWIND_PTRACE) && defined(HAVE_LINUX_PTRACE) @@ -336,7 +334,7 @@ static BOOL do_inject_fault(struct messaging_context *msg_ctx, } return send_message(msg_ctx, pid, MSG_SMB_INJECT_FAULT, - &sig, sizeof(int), False); + &sig, sizeof(int)); } #endif /* DEVELOPER */ } @@ -352,8 +350,7 @@ static BOOL do_election(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_FORCE_ELECTION, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_FORCE_ELECTION, NULL, 0); } /* Ping a samba daemon process */ @@ -381,7 +378,7 @@ static BOOL do_ping(struct messaging_context *msg_ctx, /* Send a message and register our interest in a reply */ - if (!send_message(msg_ctx, pid, MSG_PING, NULL, 0, False)) + if (!send_message(msg_ctx, pid, MSG_PING, NULL, 0)) return False; messaging_register(msg_ctx, NULL, MSG_PONG, pong_cb); @@ -425,7 +422,7 @@ static BOOL do_profile(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, pid, MSG_PROFILE, &v, sizeof(int), False); + return send_message(msg_ctx, pid, MSG_PROFILE, &v, sizeof(int)); } /* Return the profiling level */ @@ -480,7 +477,7 @@ static void profilelevel_rqst(struct messaging_context *msg_ctx, /* Send back a dummy reply */ - send_message(msg_ctx, pid, MSG_PROFILELEVEL, &v, sizeof(int), False); + send_message(msg_ctx, pid, MSG_PROFILELEVEL, &v, sizeof(int)); } static BOOL do_profilelevel(struct messaging_context *msg_ctx, @@ -494,7 +491,7 @@ static BOOL do_profilelevel(struct messaging_context *msg_ctx, /* Send a message and register our interest in a reply */ - if (!send_message(msg_ctx, pid, MSG_REQ_PROFILELEVEL, NULL, 0, False)) + if (!send_message(msg_ctx, pid, MSG_REQ_PROFILELEVEL, NULL, 0)) return False; messaging_register(msg_ctx, NULL, MSG_PROFILELEVEL, profilelevel_cb); @@ -526,7 +523,7 @@ static BOOL do_debuglevel(struct messaging_context *msg_ctx, /* Send a message and register our interest in a reply */ - if (!send_message(msg_ctx, pid, MSG_REQ_DEBUGLEVEL, NULL, 0, False)) + if (!send_message(msg_ctx, pid, MSG_REQ_DEBUGLEVEL, NULL, 0)) return False; messaging_register(msg_ctx, NULL, MSG_DEBUGLEVEL, print_pid_string_cb); @@ -696,8 +693,8 @@ static BOOL do_closeshare(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_SMB_FORCE_TDIS, argv[1], strlen(argv[1]) + 1, False); + return send_message(msg_ctx, pid, MSG_SMB_FORCE_TDIS, argv[1], + strlen(argv[1]) + 1); } /* Force a SAM synchronisation */ @@ -711,8 +708,7 @@ static BOOL do_samsync(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_SMB_SAM_SYNC, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_SMB_SAM_SYNC, NULL, 0); } /* Force a SAM replication */ @@ -726,8 +722,7 @@ static BOOL do_samrepl(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_SMB_SAM_REPL, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_SMB_SAM_REPL, NULL, 0); } /* Display talloc pool usage */ @@ -745,7 +740,7 @@ static BOOL do_poolusage(struct messaging_context *msg_ctx, /* Send a message and register our interest in a reply */ - if (!send_message(msg_ctx, pid, MSG_REQ_POOL_USAGE, NULL, 0, False)) + if (!send_message(msg_ctx, pid, MSG_REQ_POOL_USAGE, NULL, 0)) return False; wait_replies(msg_ctx, procid_to_pid(&pid) == 0); @@ -771,8 +766,7 @@ static BOOL do_dmalloc_mark(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_REQ_DMALLOC_MARK, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_REQ_DMALLOC_MARK, NULL, 0); } /* Perform a dmalloc changed */ @@ -787,8 +781,8 @@ static BOOL do_dmalloc_changed(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_REQ_DMALLOC_LOG_CHANGED, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_REQ_DMALLOC_LOG_CHANGED, + NULL, 0); } /* Shutdown a server process */ @@ -802,7 +796,7 @@ static BOOL do_shutdown(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, pid, MSG_SHUTDOWN, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_SHUTDOWN, NULL, 0); } /* Notify a driver upgrade */ @@ -817,8 +811,8 @@ static BOOL do_drvupgrade(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, - pid, MSG_DEBUG, argv[1], strlen(argv[1]) + 1, False); + return send_message(msg_ctx, pid, MSG_DEBUG, argv[1], + strlen(argv[1]) + 1); } static BOOL do_winbind_online(struct messaging_context *msg_ctx, @@ -852,7 +846,7 @@ static BOOL do_winbind_online(struct messaging_context *msg_ctx, tdb_delete_bystring(tdb, "WINBINDD_OFFLINE"); tdb_close(tdb); - return send_message(msg_ctx, pid, MSG_WINBIND_ONLINE, NULL, 0, False); + return send_message(msg_ctx, pid, MSG_WINBIND_ONLINE, NULL, 0); } static BOOL do_winbind_offline(struct messaging_context *msg_ctx, @@ -909,7 +903,7 @@ static BOOL do_winbind_offline(struct messaging_context *msg_ctx, tdb_store_bystring(tdb, "WINBINDD_OFFLINE", d, TDB_INSERT); ret = send_message(msg_ctx, pid, MSG_WINBIND_OFFLINE, - NULL, 0, False); + NULL, 0); /* Check that the entry "WINBINDD_OFFLINE" still exists. */ d = tdb_fetch_bystring( tdb, "WINBINDD_OFFLINE" ); @@ -944,7 +938,7 @@ static BOOL do_winbind_onlinestatus(struct messaging_context *msg_ctx, print_pid_string_cb); if (!send_message(msg_ctx, pid, MSG_WINBIND_ONLINESTATUS, &myid, - sizeof(myid), False)) + sizeof(myid))) return False; wait_replies(msg_ctx, procid_to_pid(&pid) == 0); @@ -969,8 +963,7 @@ static BOOL do_reload_config(struct messaging_context *msg_ctx, return False; } - return send_message(msg_ctx, pid, MSG_SMB_CONF_UPDATED, - NULL, 0, False); + return send_message(msg_ctx, pid, MSG_SMB_CONF_UPDATED, NULL, 0); } static void my_make_nmb_name( struct nmb_name *n, const char *name, int type) @@ -1018,8 +1011,7 @@ static BOOL do_nodestatus(struct messaging_context *msg_ctx, p.packet.nmb.question.question_type = 0x21; p.packet.nmb.question.question_class = 0x1; - return send_message(msg_ctx, pid, MSG_SEND_PACKET, &p, sizeof(p), - False); + return send_message(msg_ctx, pid, MSG_SEND_PACKET, &p, sizeof(p)); } /* A list of message type supported */