messaging: Remove messaging_handler_send
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
61
62 struct messaging_callback {
63         struct messaging_callback *prev, *next;
64         uint32_t msg_type;
65         void (*fn)(struct messaging_context *msg, void *private_data, 
66                    uint32_t msg_type, 
67                    struct server_id server_id, DATA_BLOB *data);
68         void *private_data;
69 };
70
71 struct messaging_context {
72         struct server_id id;
73         struct tevent_context *event_ctx;
74         struct messaging_callback *callbacks;
75
76         struct tevent_req **new_waiters;
77         size_t num_new_waiters;
78
79         struct tevent_req **waiters;
80         size_t num_waiters;
81
82         void *msg_dgm_ref;
83         struct messaging_backend *remote;
84
85         struct server_id_db *names_db;
86 };
87
88 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
89                                                struct messaging_rec *rec);
90 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
91                                    struct tevent_context *ev,
92                                    struct messaging_rec *rec);
93
94 /****************************************************************************
95  A useful function for testing the message system.
96 ****************************************************************************/
97
98 static void ping_message(struct messaging_context *msg_ctx,
99                          void *private_data,
100                          uint32_t msg_type,
101                          struct server_id src,
102                          DATA_BLOB *data)
103 {
104         struct server_id_buf idbuf;
105
106         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
107                   server_id_str_buf(src, &idbuf), (int)data->length,
108                   data->data ? (char *)data->data : ""));
109
110         messaging_send(msg_ctx, src, MSG_PONG, data);
111 }
112
113 struct messaging_rec *messaging_rec_create(
114         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
115         uint32_t msg_type, const struct iovec *iov, int iovlen,
116         const int *fds, size_t num_fds)
117 {
118         ssize_t buflen;
119         uint8_t *buf;
120         struct messaging_rec *result;
121
122         if (num_fds > INT8_MAX) {
123                 return NULL;
124         }
125
126         buflen = iov_buflen(iov, iovlen);
127         if (buflen == -1) {
128                 return NULL;
129         }
130         buf = talloc_array(mem_ctx, uint8_t, buflen);
131         if (buf == NULL) {
132                 return NULL;
133         }
134         iov_buf(iov, iovlen, buf, buflen);
135
136         {
137                 struct messaging_rec rec;
138                 int64_t fds64[num_fds];
139                 size_t i;
140
141                 for (i=0; i<num_fds; i++) {
142                         fds64[i] = fds[i];
143                 }
144
145                 rec = (struct messaging_rec) {
146                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
147                         .src = src, .dest = dst,
148                         .buf.data = buf, .buf.length = buflen,
149                         .num_fds = num_fds, .fds = fds64,
150                 };
151
152                 result = messaging_rec_dup(mem_ctx, &rec);
153         }
154
155         TALLOC_FREE(buf);
156
157         return result;
158 }
159
160 static void messaging_recv_cb(struct tevent_context *ev,
161                               const uint8_t *msg, size_t msg_len,
162                               int *fds, size_t num_fds,
163                               void *private_data)
164 {
165         struct messaging_context *msg_ctx = talloc_get_type_abort(
166                 private_data, struct messaging_context);
167         struct server_id_buf idbuf;
168         struct messaging_rec rec;
169         int64_t fds64[MIN(num_fds, INT8_MAX)];
170         size_t i;
171
172         if (msg_len < MESSAGE_HDR_LENGTH) {
173                 DBG_WARNING("message too short: %zu\n", msg_len);
174                 goto close_fail;
175         }
176
177         if (num_fds > INT8_MAX) {
178                 DBG_WARNING("too many fds: %zu\n", num_fds);
179                 goto close_fail;
180         }
181
182         /*
183          * "consume" the fds by copying them and setting
184          * the original variable to -1
185          */
186         for (i=0; i < num_fds; i++) {
187                 fds64[i] = fds[i];
188                 fds[i] = -1;
189         }
190
191         rec = (struct messaging_rec) {
192                 .msg_version = MESSAGE_VERSION,
193                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
194                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
195                 .num_fds = num_fds,
196                 .fds = fds64,
197         };
198
199         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
200
201         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
202                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
203                   server_id_str_buf(rec.src, &idbuf));
204
205         messaging_dispatch_rec(msg_ctx, ev, &rec);
206         return;
207
208 close_fail:
209         for (i=0; i < num_fds; i++) {
210                 close(fds[i]);
211         }
212 }
213
214 static int messaging_context_destructor(struct messaging_context *ctx)
215 {
216         size_t i;
217
218         for (i=0; i<ctx->num_new_waiters; i++) {
219                 if (ctx->new_waiters[i] != NULL) {
220                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
221                         ctx->new_waiters[i] = NULL;
222                 }
223         }
224         for (i=0; i<ctx->num_waiters; i++) {
225                 if (ctx->waiters[i] != NULL) {
226                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
227                         ctx->waiters[i] = NULL;
228                 }
229         }
230
231         return 0;
232 }
233
234 static const char *private_path(const char *name)
235 {
236         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
237 }
238
239 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
240                                         struct tevent_context *ev,
241                                         struct messaging_context **pmsg_ctx)
242 {
243         TALLOC_CTX *frame;
244         struct messaging_context *ctx;
245         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
246         int ret;
247         const char *lck_path;
248         const char *priv_path;
249         bool ok;
250
251         lck_path = lock_path("msg.lock");
252         if (lck_path == NULL) {
253                 return NT_STATUS_NO_MEMORY;
254         }
255
256         ok = directory_create_or_exist_strict(lck_path,
257                                               sec_initial_uid(),
258                                               0755);
259         if (!ok) {
260                 DBG_DEBUG("Could not create lock directory: %s\n",
261                           strerror(errno));
262                 return NT_STATUS_ACCESS_DENIED;
263         }
264
265         priv_path = private_path("msg.sock");
266         if (priv_path == NULL) {
267                 return NT_STATUS_NO_MEMORY;
268         }
269
270         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
271                                               0700);
272         if (!ok) {
273                 DBG_DEBUG("Could not create msg directory: %s\n",
274                           strerror(errno));
275                 return NT_STATUS_ACCESS_DENIED;
276         }
277
278         frame = talloc_stackframe();
279         if (frame == NULL) {
280                 return NT_STATUS_NO_MEMORY;
281         }
282
283         ctx = talloc_zero(frame, struct messaging_context);
284         if (ctx == NULL) {
285                 status = NT_STATUS_NO_MEMORY;
286                 goto done;
287         }
288
289         ctx->id = (struct server_id) {
290                 .pid = getpid(), .vnn = NONCLUSTER_VNN
291         };
292
293         ctx->event_ctx = ev;
294
295         sec_init();
296
297         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
298                                              ctx->event_ctx,
299                                              &ctx->id.unique_id,
300                                              priv_path,
301                                              lck_path,
302                                              messaging_recv_cb,
303                                              ctx,
304                                              &ret);
305         if (ctx->msg_dgm_ref == NULL) {
306                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
307                 status = map_nt_error_from_unix(ret);
308                 goto done;
309         }
310         talloc_set_destructor(ctx, messaging_context_destructor);
311
312         if (lp_clustering()) {
313                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
314
315                 if (ret != 0) {
316                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
317                                   strerror(ret)));
318                         status = map_nt_error_from_unix(ret);
319                         goto done;
320                 }
321         }
322         ctx->id.vnn = get_my_vnn();
323
324         ctx->names_db = server_id_db_init(ctx,
325                                           ctx->id,
326                                           lp_lock_directory(),
327                                           0,
328                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
329         if (ctx->names_db == NULL) {
330                 DBG_DEBUG("server_id_db_init failed\n");
331                 status = NT_STATUS_NO_MEMORY;
332                 goto done;
333         }
334
335         messaging_register(ctx, NULL, MSG_PING, ping_message);
336
337         /* Register some debugging related messages */
338
339         register_msg_pool_usage(ctx);
340         register_dmalloc_msgs(ctx);
341         debug_register_msgs(ctx);
342
343         {
344                 struct server_id_buf tmp;
345                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
346         }
347
348         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
349
350         status = NT_STATUS_OK;
351 done:
352         TALLOC_FREE(frame);
353
354         return status;
355 }
356
357 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
358                                          struct tevent_context *ev)
359 {
360         struct messaging_context *ctx = NULL;
361         NTSTATUS status;
362
363         status = messaging_init_internal(mem_ctx,
364                                          ev,
365                                          &ctx);
366         if (!NT_STATUS_IS_OK(status)) {
367                 return NULL;
368         }
369
370         return ctx;
371 }
372
373 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
374                                struct tevent_context *ev,
375                                struct messaging_context **pmsg_ctx)
376 {
377         return messaging_init_internal(mem_ctx,
378                                         ev,
379                                         pmsg_ctx);
380 }
381
382 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
383 {
384         return msg_ctx->id;
385 }
386
387 /*
388  * re-init after a fork
389  */
390 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
391 {
392         int ret;
393         char *lck_path;
394
395         TALLOC_FREE(msg_ctx->msg_dgm_ref);
396
397         msg_ctx->id = (struct server_id) {
398                 .pid = getpid(), .vnn = msg_ctx->id.vnn
399         };
400
401         lck_path = lock_path("msg.lock");
402         if (lck_path == NULL) {
403                 return NT_STATUS_NO_MEMORY;
404         }
405
406         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
407                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
408                 private_path("msg.sock"), lck_path,
409                 messaging_recv_cb, msg_ctx, &ret);
410
411         if (msg_ctx->msg_dgm_ref == NULL) {
412                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
413                 return map_nt_error_from_unix(ret);
414         }
415
416         if (lp_clustering()) {
417                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
418                                              msg_ctx->remote);
419
420                 if (ret != 0) {
421                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
422                                   strerror(ret)));
423                         return map_nt_error_from_unix(ret);
424                 }
425         }
426
427         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
428
429         return NT_STATUS_OK;
430 }
431
432
433 /*
434  * Register a dispatch function for a particular message type. Allow multiple
435  * registrants
436 */
437 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
438                             void *private_data,
439                             uint32_t msg_type,
440                             void (*fn)(struct messaging_context *msg,
441                                        void *private_data, 
442                                        uint32_t msg_type, 
443                                        struct server_id server_id,
444                                        DATA_BLOB *data))
445 {
446         struct messaging_callback *cb;
447
448         DEBUG(5, ("Registering messaging pointer for type %u - "
449                   "private_data=%p\n",
450                   (unsigned)msg_type, private_data));
451
452         /*
453          * Only one callback per type
454          */
455
456         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
457                 /* we allow a second registration of the same message
458                    type if it has a different private pointer. This is
459                    needed in, for example, the internal notify code,
460                    which creates a new notify context for each tree
461                    connect, and expects to receive messages to each of
462                    them. */
463                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
464                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
465                                   (unsigned)msg_type, private_data));
466                         cb->fn = fn;
467                         cb->private_data = private_data;
468                         return NT_STATUS_OK;
469                 }
470         }
471
472         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
473                 return NT_STATUS_NO_MEMORY;
474         }
475
476         cb->msg_type = msg_type;
477         cb->fn = fn;
478         cb->private_data = private_data;
479
480         DLIST_ADD(msg_ctx->callbacks, cb);
481         return NT_STATUS_OK;
482 }
483
484 /*
485   De-register the function for a particular message type.
486 */
487 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
488                           void *private_data)
489 {
490         struct messaging_callback *cb, *next;
491
492         for (cb = ctx->callbacks; cb; cb = next) {
493                 next = cb->next;
494                 if ((cb->msg_type == msg_type)
495                     && (cb->private_data == private_data)) {
496                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
497                                   (unsigned)msg_type, private_data));
498                         DLIST_REMOVE(ctx->callbacks, cb);
499                         TALLOC_FREE(cb);
500                 }
501         }
502 }
503
504 /*
505   Send a message to a particular server
506 */
507 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
508                         struct server_id server, uint32_t msg_type,
509                         const DATA_BLOB *data)
510 {
511         struct iovec iov = {0};
512
513         if (data != NULL) {
514                 iov.iov_base = data->data;
515                 iov.iov_len = data->length;
516         };
517
518         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
519 }
520
521 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
522                             struct server_id server, uint32_t msg_type,
523                             const uint8_t *buf, size_t len)
524 {
525         DATA_BLOB blob = data_blob_const(buf, len);
526         return messaging_send(msg_ctx, server, msg_type, &blob);
527 }
528
529 struct messaging_post_state {
530         struct messaging_context *msg_ctx;
531         struct messaging_rec *rec;
532 };
533
534 static void messaging_post_handler(struct tevent_context *ev,
535                                    struct tevent_immediate *ti,
536                                    void *private_data);
537
538 static int messaging_post_self(struct messaging_context *msg_ctx,
539                                struct server_id src, struct server_id dst,
540                                uint32_t msg_type,
541                                const struct iovec *iov, int iovlen,
542                                const int *fds, size_t num_fds)
543 {
544         struct tevent_immediate *ti;
545         struct messaging_post_state *state;
546
547         state = talloc(msg_ctx, struct messaging_post_state);
548         if (state == NULL) {
549                 return ENOMEM;
550         }
551         state->msg_ctx = msg_ctx;
552
553         ti = tevent_create_immediate(state);
554         if (ti == NULL) {
555                 goto fail;
556         }
557         state->rec = messaging_rec_create(
558                 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
559         if (state->rec == NULL) {
560                 goto fail;
561         }
562
563         tevent_schedule_immediate(ti, msg_ctx->event_ctx,
564                                   messaging_post_handler, state);
565         return 0;
566
567 fail:
568         TALLOC_FREE(state);
569         return ENOMEM;
570 }
571
572 static void messaging_post_handler(struct tevent_context *ev,
573                                    struct tevent_immediate *ti,
574                                    void *private_data)
575 {
576         struct messaging_post_state *state = talloc_get_type_abort(
577                 private_data, struct messaging_post_state);
578         messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
579         TALLOC_FREE(state);
580 }
581
582 int messaging_send_iov_from(struct messaging_context *msg_ctx,
583                             struct server_id src, struct server_id dst,
584                             uint32_t msg_type,
585                             const struct iovec *iov, int iovlen,
586                             const int *fds, size_t num_fds)
587 {
588         int ret;
589         uint8_t hdr[MESSAGE_HDR_LENGTH];
590         struct iovec iov2[iovlen+1];
591
592         if (server_id_is_disconnected(&dst)) {
593                 return EINVAL;
594         }
595
596         if (num_fds > INT8_MAX) {
597                 return EINVAL;
598         }
599
600         if (dst.vnn != msg_ctx->id.vnn) {
601                 if (num_fds > 0) {
602                         return ENOSYS;
603                 }
604
605                 ret = msg_ctx->remote->send_fn(src, dst,
606                                                msg_type, iov, iovlen,
607                                                NULL, 0,
608                                                msg_ctx->remote);
609                 return ret;
610         }
611
612         if (server_id_equal(&dst, &msg_ctx->id)) {
613                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
614                                           iov, iovlen, fds, num_fds);
615                 return ret;
616         }
617
618         message_hdr_put(hdr, msg_type, src, dst);
619         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
620         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
621
622         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
623
624         if (ret == EACCES) {
625                 become_root();
626                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
627                                          fds, num_fds);
628                 unbecome_root();
629         }
630
631         if (ret == ECONNREFUSED) {
632                 /*
633                  * Linux returns this when a socket exists in the file
634                  * system without a listening process. This is not
635                  * documented in susv4 or the linux manpages, but it's
636                  * easily testable. For the higher levels this is the
637                  * same as "destination does not exist"
638                  */
639                 ret = ENOENT;
640         }
641
642         return ret;
643 }
644
645 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
646                             struct server_id dst, uint32_t msg_type,
647                             const struct iovec *iov, int iovlen,
648                             const int *fds, size_t num_fds)
649 {
650         int ret;
651
652         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
653                                       iov, iovlen, fds, num_fds);
654         if (ret != 0) {
655                 return map_nt_error_from_unix(ret);
656         }
657         return NT_STATUS_OK;
658 }
659
660 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
661                                                struct messaging_rec *rec)
662 {
663         struct messaging_rec *result;
664         size_t fds_size = sizeof(int64_t) * rec->num_fds;
665         size_t payload_len;
666
667         payload_len = rec->buf.length + fds_size;
668         if (payload_len < rec->buf.length) {
669                 /* overflow */
670                 return NULL;
671         }
672
673         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
674                                       payload_len);
675         if (result == NULL) {
676                 return NULL;
677         }
678         *result = *rec;
679
680         /* Doesn't fail, see talloc_pooled_object */
681
682         result->buf.data = talloc_memdup(result, rec->buf.data,
683                                          rec->buf.length);
684
685         result->fds = NULL;
686         if (result->num_fds > 0) {
687                 result->fds = talloc_memdup(result, rec->fds, fds_size);
688         }
689
690         return result;
691 }
692
693 struct messaging_filtered_read_state {
694         struct tevent_context *ev;
695         struct messaging_context *msg_ctx;
696         struct messaging_dgm_fde *fde;
697
698         bool (*filter)(struct messaging_rec *rec, void *private_data);
699         void *private_data;
700
701         struct messaging_rec *rec;
702 };
703
704 static void messaging_filtered_read_cleanup(struct tevent_req *req,
705                                             enum tevent_req_state req_state);
706
707 struct tevent_req *messaging_filtered_read_send(
708         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
709         struct messaging_context *msg_ctx,
710         bool (*filter)(struct messaging_rec *rec, void *private_data),
711         void *private_data)
712 {
713         struct tevent_req *req;
714         struct messaging_filtered_read_state *state;
715         size_t new_waiters_len;
716
717         req = tevent_req_create(mem_ctx, &state,
718                                 struct messaging_filtered_read_state);
719         if (req == NULL) {
720                 return NULL;
721         }
722         state->ev = ev;
723         state->msg_ctx = msg_ctx;
724         state->filter = filter;
725         state->private_data = private_data;
726
727         /*
728          * We have to defer the callback here, as we might be called from
729          * within a different tevent_context than state->ev
730          */
731         tevent_req_defer_callback(req, state->ev);
732
733         state->fde = messaging_dgm_register_tevent_context(state, ev);
734         if (tevent_req_nomem(state->fde, req)) {
735                 return tevent_req_post(req, ev);
736         }
737
738         /*
739          * We add ourselves to the "new_waiters" array, not the "waiters"
740          * array. If we are called from within messaging_read_done,
741          * messaging_dispatch_rec will be in an active for-loop on
742          * "waiters". We must be careful not to mess with this array, because
743          * it could mean that a single event is being delivered twice.
744          */
745
746         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
747
748         if (new_waiters_len == msg_ctx->num_new_waiters) {
749                 struct tevent_req **tmp;
750
751                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
752                                      struct tevent_req *, new_waiters_len+1);
753                 if (tevent_req_nomem(tmp, req)) {
754                         return tevent_req_post(req, ev);
755                 }
756                 msg_ctx->new_waiters = tmp;
757         }
758
759         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
760         msg_ctx->num_new_waiters += 1;
761         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
762
763         return req;
764 }
765
766 static void messaging_filtered_read_cleanup(struct tevent_req *req,
767                                             enum tevent_req_state req_state)
768 {
769         struct messaging_filtered_read_state *state = tevent_req_data(
770                 req, struct messaging_filtered_read_state);
771         struct messaging_context *msg_ctx = state->msg_ctx;
772         size_t i;
773
774         tevent_req_set_cleanup_fn(req, NULL);
775
776         TALLOC_FREE(state->fde);
777
778         /*
779          * Just set the [new_]waiters entry to NULL, be careful not to mess
780          * with the other "waiters" array contents. We are often called from
781          * within "messaging_dispatch_rec", which loops over
782          * "waiters". Messing with the "waiters" array will mess up that
783          * for-loop.
784          */
785
786         for (i=0; i<msg_ctx->num_waiters; i++) {
787                 if (msg_ctx->waiters[i] == req) {
788                         msg_ctx->waiters[i] = NULL;
789                         return;
790                 }
791         }
792
793         for (i=0; i<msg_ctx->num_new_waiters; i++) {
794                 if (msg_ctx->new_waiters[i] == req) {
795                         msg_ctx->new_waiters[i] = NULL;
796                         return;
797                 }
798         }
799 }
800
801 static void messaging_filtered_read_done(struct tevent_req *req,
802                                          struct messaging_rec *rec)
803 {
804         struct messaging_filtered_read_state *state = tevent_req_data(
805                 req, struct messaging_filtered_read_state);
806
807         state->rec = messaging_rec_dup(state, rec);
808         if (tevent_req_nomem(state->rec, req)) {
809                 return;
810         }
811         tevent_req_done(req);
812 }
813
814 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
815                                  struct messaging_rec **presult)
816 {
817         struct messaging_filtered_read_state *state = tevent_req_data(
818                 req, struct messaging_filtered_read_state);
819         int err;
820
821         if (tevent_req_is_unix_error(req, &err)) {
822                 tevent_req_received(req);
823                 return err;
824         }
825         if (presult != NULL) {
826                 *presult = talloc_move(mem_ctx, &state->rec);
827         }
828         return 0;
829 }
830
831 struct messaging_read_state {
832         uint32_t msg_type;
833         struct messaging_rec *rec;
834 };
835
836 static bool messaging_read_filter(struct messaging_rec *rec,
837                                   void *private_data);
838 static void messaging_read_done(struct tevent_req *subreq);
839
840 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
841                                        struct tevent_context *ev,
842                                        struct messaging_context *msg,
843                                        uint32_t msg_type)
844 {
845         struct tevent_req *req, *subreq;
846         struct messaging_read_state *state;
847
848         req = tevent_req_create(mem_ctx, &state,
849                                 struct messaging_read_state);
850         if (req == NULL) {
851                 return NULL;
852         }
853         state->msg_type = msg_type;
854
855         subreq = messaging_filtered_read_send(state, ev, msg,
856                                               messaging_read_filter, state);
857         if (tevent_req_nomem(subreq, req)) {
858                 return tevent_req_post(req, ev);
859         }
860         tevent_req_set_callback(subreq, messaging_read_done, req);
861         return req;
862 }
863
864 static bool messaging_read_filter(struct messaging_rec *rec,
865                                   void *private_data)
866 {
867         struct messaging_read_state *state = talloc_get_type_abort(
868                 private_data, struct messaging_read_state);
869
870         if (rec->num_fds != 0) {
871                 return false;
872         }
873
874         return rec->msg_type == state->msg_type;
875 }
876
877 static void messaging_read_done(struct tevent_req *subreq)
878 {
879         struct tevent_req *req = tevent_req_callback_data(
880                 subreq, struct tevent_req);
881         struct messaging_read_state *state = tevent_req_data(
882                 req, struct messaging_read_state);
883         int ret;
884
885         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
886         TALLOC_FREE(subreq);
887         if (tevent_req_error(req, ret)) {
888                 return;
889         }
890         tevent_req_done(req);
891 }
892
893 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
894                         struct messaging_rec **presult)
895 {
896         struct messaging_read_state *state = tevent_req_data(
897                 req, struct messaging_read_state);
898         int err;
899
900         if (tevent_req_is_unix_error(req, &err)) {
901                 return err;
902         }
903         if (presult != NULL) {
904                 *presult = talloc_move(mem_ctx, &state->rec);
905         }
906         return 0;
907 }
908
909 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
910 {
911         if (msg_ctx->num_new_waiters == 0) {
912                 return true;
913         }
914
915         if (talloc_array_length(msg_ctx->waiters) <
916             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
917                 struct tevent_req **tmp;
918                 tmp = talloc_realloc(
919                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
920                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
921                 if (tmp == NULL) {
922                         DEBUG(1, ("%s: talloc failed\n", __func__));
923                         return false;
924                 }
925                 msg_ctx->waiters = tmp;
926         }
927
928         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
929                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
930
931         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
932         msg_ctx->num_new_waiters = 0;
933
934         return true;
935 }
936
937 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
938                                        struct messaging_rec *rec)
939 {
940         struct messaging_callback *cb, *next;
941
942         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
943                 size_t j;
944
945                 next = cb->next;
946                 if (cb->msg_type != rec->msg_type) {
947                         continue;
948                 }
949
950                 /*
951                  * the old style callbacks don't support fd passing
952                  */
953                 for (j=0; j < rec->num_fds; j++) {
954                         int fd = rec->fds[j];
955                         close(fd);
956                 }
957                 rec->num_fds = 0;
958                 rec->fds = NULL;
959
960                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
961                        rec->src, &rec->buf);
962
963                 return true;
964         }
965
966         return false;
967 }
968
969 /*
970   Dispatch one messaging_rec
971 */
972 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
973                                    struct tevent_context *ev,
974                                    struct messaging_rec *rec)
975 {
976         size_t i;
977         bool consumed;
978
979         if (ev == msg_ctx->event_ctx) {
980                 consumed = messaging_dispatch_classic(msg_ctx, rec);
981                 if (consumed) {
982                         return;
983                 }
984         }
985
986         if (!messaging_append_new_waiters(msg_ctx)) {
987                 size_t j;
988                 for (j=0; j < rec->num_fds; j++) {
989                         int fd = rec->fds[j];
990                         close(fd);
991                 }
992                 rec->num_fds = 0;
993                 rec->fds = NULL;
994                 return;
995         }
996
997         i = 0;
998         while (i < msg_ctx->num_waiters) {
999                 struct tevent_req *req;
1000                 struct messaging_filtered_read_state *state;
1001
1002                 req = msg_ctx->waiters[i];
1003                 if (req == NULL) {
1004                         /*
1005                          * This got cleaned up. In the meantime,
1006                          * move everything down one. We need
1007                          * to keep the order of waiters, as
1008                          * other code may depend on this.
1009                          */
1010                         if (i < msg_ctx->num_waiters - 1) {
1011                                 memmove(&msg_ctx->waiters[i],
1012                                         &msg_ctx->waiters[i+1],
1013                                         sizeof(struct tevent_req *) *
1014                                             (msg_ctx->num_waiters - i - 1));
1015                         }
1016                         msg_ctx->num_waiters -= 1;
1017                         continue;
1018                 }
1019
1020                 state = tevent_req_data(
1021                         req, struct messaging_filtered_read_state);
1022                 if ((ev == state->ev) &&
1023                     state->filter(rec, state->private_data)) {
1024                         messaging_filtered_read_done(req, rec);
1025                         return;
1026                 }
1027
1028                 i += 1;
1029         }
1030
1031         if (ev != msg_ctx->event_ctx) {
1032                 struct iovec iov;
1033                 int fds[rec->num_fds];
1034                 int ret;
1035
1036                 /*
1037                  * We've been listening on a nested event
1038                  * context. Messages need to be handled in the main
1039                  * event context, so post to ourselves
1040                  */
1041
1042                 iov.iov_base = rec->buf.data;
1043                 iov.iov_len = rec->buf.length;
1044
1045                 for (i=0; i<rec->num_fds; i++) {
1046                         fds[i] = rec->fds[i];
1047                 }
1048
1049                 ret = messaging_post_self(
1050                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1051                         &iov, 1, fds, rec->num_fds);
1052                 if (ret == 0) {
1053                         return;
1054                 }
1055         }
1056
1057         /*
1058          * If the fd-array isn't used, just close it.
1059          */
1060         for (i=0; i < rec->num_fds; i++) {
1061                 int fd = rec->fds[i];
1062                 close(fd);
1063         }
1064         rec->num_fds = 0;
1065         rec->fds = NULL;
1066 }
1067
1068 static int mess_parent_dgm_cleanup(void *private_data);
1069 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1070
1071 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1072 {
1073         struct tevent_req *req;
1074
1075         req = background_job_send(
1076                 msg, msg->event_ctx, msg, NULL, 0,
1077                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1078                             60*15),
1079                 mess_parent_dgm_cleanup, msg);
1080         if (req == NULL) {
1081                 return false;
1082         }
1083         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1084         return true;
1085 }
1086
1087 static int mess_parent_dgm_cleanup(void *private_data)
1088 {
1089         int ret;
1090
1091         ret = messaging_dgm_wipe();
1092         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1093                    ret ? strerror(ret) : "ok"));
1094         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1095                            60*15);
1096 }
1097
1098 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1099 {
1100         struct messaging_context *msg = tevent_req_callback_data(
1101                 req, struct messaging_context);
1102         NTSTATUS status;
1103
1104         status = background_job_recv(req);
1105         TALLOC_FREE(req);
1106         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1107                   nt_errstr(status)));
1108
1109         req = background_job_send(
1110                 msg, msg->event_ctx, msg, NULL, 0,
1111                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1112                             60*15),
1113                 mess_parent_dgm_cleanup, msg);
1114         if (req == NULL) {
1115                 DEBUG(1, ("background_job_send failed\n"));
1116                 return;
1117         }
1118         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1119 }
1120
1121 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1122 {
1123         int ret;
1124
1125         if (pid == 0) {
1126                 ret = messaging_dgm_wipe();
1127         } else {
1128                 ret = messaging_dgm_cleanup(pid);
1129         }
1130
1131         return ret;
1132 }
1133
1134 struct tevent_context *messaging_tevent_context(
1135         struct messaging_context *msg_ctx)
1136 {
1137         return msg_ctx->event_ctx;
1138 }
1139
1140 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1141 {
1142         return msg_ctx->names_db;
1143 }
1144
1145 /** @} **/