lib: Add lib/util/server_id.h
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/util/iov_buf.h"
57 #include "lib/util/server_id_db.h"
58 #include "lib/messages_dgm_ref.h"
59 #include "lib/messages_util.h"
60
61 struct messaging_callback {
62         struct messaging_callback *prev, *next;
63         uint32_t msg_type;
64         void (*fn)(struct messaging_context *msg, void *private_data, 
65                    uint32_t msg_type, 
66                    struct server_id server_id, DATA_BLOB *data);
67         void *private_data;
68 };
69
70 struct messaging_context {
71         struct server_id id;
72         struct tevent_context *event_ctx;
73         struct messaging_callback *callbacks;
74
75         struct tevent_req **new_waiters;
76         unsigned num_new_waiters;
77
78         struct tevent_req **waiters;
79         unsigned num_waiters;
80
81         void *msg_dgm_ref;
82         struct messaging_backend *remote;
83
84         struct server_id_db *names_db;
85 };
86
87 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
88                                                struct messaging_rec *rec);
89 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
90                                    struct tevent_context *ev,
91                                    struct messaging_rec *rec);
92
93 /****************************************************************************
94  A useful function for testing the message system.
95 ****************************************************************************/
96
97 static void ping_message(struct messaging_context *msg_ctx,
98                          void *private_data,
99                          uint32_t msg_type,
100                          struct server_id src,
101                          DATA_BLOB *data)
102 {
103         struct server_id_buf idbuf;
104
105         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
106                   server_id_str_buf(src, &idbuf), (int)data->length,
107                   data->data ? (char *)data->data : ""));
108
109         messaging_send(msg_ctx, src, MSG_PONG, data);
110 }
111
112 static struct messaging_rec *messaging_rec_create(
113         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
114         uint32_t msg_type, const struct iovec *iov, int iovlen,
115         const int *fds, size_t num_fds)
116 {
117         ssize_t buflen;
118         uint8_t *buf;
119         struct messaging_rec *result;
120
121         if (num_fds > INT8_MAX) {
122                 return NULL;
123         }
124
125         buflen = iov_buflen(iov, iovlen);
126         if (buflen == -1) {
127                 return NULL;
128         }
129         buf = talloc_array(mem_ctx, uint8_t, buflen);
130         if (buf == NULL) {
131                 return NULL;
132         }
133         iov_buf(iov, iovlen, buf, buflen);
134
135         {
136                 struct messaging_rec rec;
137                 int64_t fds64[num_fds];
138                 size_t i;
139
140                 for (i=0; i<num_fds; i++) {
141                         fds64[i] = fds[i];
142                 }
143
144                 rec = (struct messaging_rec) {
145                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
146                         .src = src, .dest = dst,
147                         .buf.data = buf, .buf.length = buflen,
148                         .num_fds = num_fds, .fds = fds64,
149                 };
150
151                 result = messaging_rec_dup(mem_ctx, &rec);
152         }
153
154         TALLOC_FREE(buf);
155
156         return result;
157 }
158
159 static void messaging_recv_cb(struct tevent_context *ev,
160                               const uint8_t *msg, size_t msg_len,
161                               int *fds, size_t num_fds,
162                               void *private_data)
163 {
164         struct messaging_context *msg_ctx = talloc_get_type_abort(
165                 private_data, struct messaging_context);
166         struct server_id_buf idbuf;
167         struct messaging_rec rec;
168         int64_t fds64[MIN(num_fds, INT8_MAX)];
169         size_t i;
170
171         if (msg_len < MESSAGE_HDR_LENGTH) {
172                 DBG_WARNING("message too short: %zu\n", msg_len);
173                 goto close_fail;
174         }
175
176         if (num_fds > INT8_MAX) {
177                 DBG_WARNING("too many fds: %zu\n", num_fds);
178                 goto close_fail;
179         }
180
181         /*
182          * "consume" the fds by copying them and setting
183          * the original variable to -1
184          */
185         for (i=0; i < num_fds; i++) {
186                 fds64[i] = fds[i];
187                 fds[i] = -1;
188         }
189
190         rec = (struct messaging_rec) {
191                 .msg_version = MESSAGE_VERSION,
192                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
193                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
194                 .num_fds = num_fds,
195                 .fds = fds64,
196         };
197
198         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
199
200         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
201                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
202                   server_id_str_buf(rec.src, &idbuf));
203
204         messaging_dispatch_rec(msg_ctx, ev, &rec);
205         return;
206
207 close_fail:
208         for (i=0; i < num_fds; i++) {
209                 close(fds[i]);
210         }
211 }
212
213 static int messaging_context_destructor(struct messaging_context *ctx)
214 {
215         unsigned i;
216
217         for (i=0; i<ctx->num_new_waiters; i++) {
218                 if (ctx->new_waiters[i] != NULL) {
219                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
220                         ctx->new_waiters[i] = NULL;
221                 }
222         }
223         for (i=0; i<ctx->num_waiters; i++) {
224                 if (ctx->waiters[i] != NULL) {
225                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
226                         ctx->waiters[i] = NULL;
227                 }
228         }
229
230         return 0;
231 }
232
233 static const char *private_path(const char *name)
234 {
235         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
236 }
237
238 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
239                                         struct tevent_context *ev,
240                                         struct messaging_context **pmsg_ctx)
241 {
242         TALLOC_CTX *frame;
243         struct messaging_context *ctx;
244         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
245         int ret;
246         const char *lck_path;
247         const char *priv_path;
248         bool ok;
249
250         lck_path = lock_path("msg.lock");
251         if (lck_path == NULL) {
252                 return NT_STATUS_NO_MEMORY;
253         }
254
255         ok = directory_create_or_exist_strict(lck_path,
256                                               sec_initial_uid(),
257                                               0755);
258         if (!ok) {
259                 DBG_DEBUG("Could not create lock directory: %s\n",
260                           strerror(errno));
261                 return NT_STATUS_ACCESS_DENIED;
262         }
263
264         priv_path = private_path("msg.sock");
265         if (priv_path == NULL) {
266                 return NT_STATUS_NO_MEMORY;
267         }
268
269         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
270                                               0700);
271         if (!ok) {
272                 DBG_DEBUG("Could not create msg directory: %s\n",
273                           strerror(errno));
274                 return NT_STATUS_ACCESS_DENIED;
275         }
276
277         frame = talloc_stackframe();
278         if (frame == NULL) {
279                 return NT_STATUS_NO_MEMORY;
280         }
281
282         ctx = talloc_zero(frame, struct messaging_context);
283         if (ctx == NULL) {
284                 status = NT_STATUS_NO_MEMORY;
285                 goto done;
286         }
287
288         ctx->id = (struct server_id) {
289                 .pid = getpid(), .vnn = NONCLUSTER_VNN
290         };
291
292         ctx->event_ctx = ev;
293
294         sec_init();
295
296         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
297                                              ctx->event_ctx,
298                                              &ctx->id.unique_id,
299                                              priv_path,
300                                              lck_path,
301                                              messaging_recv_cb,
302                                              ctx,
303                                              &ret);
304         if (ctx->msg_dgm_ref == NULL) {
305                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
306                 status = NT_STATUS_INTERNAL_ERROR;
307                 goto done;
308         }
309         talloc_set_destructor(ctx, messaging_context_destructor);
310
311         if (lp_clustering()) {
312                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
313
314                 if (ret != 0) {
315                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
316                                   strerror(ret)));
317                         status = NT_STATUS_INTERNAL_ERROR;
318                         goto done;
319                 }
320         }
321         ctx->id.vnn = get_my_vnn();
322
323         ctx->names_db = server_id_db_init(ctx,
324                                           ctx->id,
325                                           lp_lock_directory(),
326                                           0,
327                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
328         if (ctx->names_db == NULL) {
329                 DBG_DEBUG("server_id_db_init failed\n");
330                 status = NT_STATUS_INTERNAL_ERROR;
331                 goto done;
332         }
333
334         messaging_register(ctx, NULL, MSG_PING, ping_message);
335
336         /* Register some debugging related messages */
337
338         register_msg_pool_usage(ctx);
339         register_dmalloc_msgs(ctx);
340         debug_register_msgs(ctx);
341
342         {
343                 struct server_id_buf tmp;
344                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
345         }
346
347         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
348
349         status = NT_STATUS_OK;
350 done:
351         TALLOC_FREE(frame);
352
353         return status;
354 }
355
356 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
357                                          struct tevent_context *ev)
358 {
359         struct messaging_context *ctx = NULL;
360         NTSTATUS status;
361
362         status = messaging_init_internal(mem_ctx,
363                                          ev,
364                                          &ctx);
365         if (!NT_STATUS_IS_OK(status)) {
366                 return NULL;
367         }
368
369         return ctx;
370 }
371
372 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
373                                struct tevent_context *ev,
374                                struct messaging_context **pmsg_ctx)
375 {
376         return messaging_init_internal(mem_ctx,
377                                         ev,
378                                         pmsg_ctx);
379 }
380
381 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
382 {
383         return msg_ctx->id;
384 }
385
386 /*
387  * re-init after a fork
388  */
389 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
390 {
391         int ret;
392         char *lck_path;
393
394         TALLOC_FREE(msg_ctx->msg_dgm_ref);
395
396         msg_ctx->id = (struct server_id) {
397                 .pid = getpid(), .vnn = msg_ctx->id.vnn
398         };
399
400         lck_path = lock_path("msg.lock");
401         if (lck_path == NULL) {
402                 return NT_STATUS_NO_MEMORY;
403         }
404
405         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
406                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
407                 private_path("msg.sock"), lck_path,
408                 messaging_recv_cb, msg_ctx, &ret);
409
410         if (msg_ctx->msg_dgm_ref == NULL) {
411                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
412                 return map_nt_error_from_unix(ret);
413         }
414
415         if (lp_clustering()) {
416                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
417                                              msg_ctx->remote);
418
419                 if (ret != 0) {
420                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
421                                   strerror(ret)));
422                         return map_nt_error_from_unix(ret);
423                 }
424         }
425
426         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
427
428         return NT_STATUS_OK;
429 }
430
431
432 /*
433  * Register a dispatch function for a particular message type. Allow multiple
434  * registrants
435 */
436 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
437                             void *private_data,
438                             uint32_t msg_type,
439                             void (*fn)(struct messaging_context *msg,
440                                        void *private_data, 
441                                        uint32_t msg_type, 
442                                        struct server_id server_id,
443                                        DATA_BLOB *data))
444 {
445         struct messaging_callback *cb;
446
447         DEBUG(5, ("Registering messaging pointer for type %u - "
448                   "private_data=%p\n",
449                   (unsigned)msg_type, private_data));
450
451         /*
452          * Only one callback per type
453          */
454
455         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
456                 /* we allow a second registration of the same message
457                    type if it has a different private pointer. This is
458                    needed in, for example, the internal notify code,
459                    which creates a new notify context for each tree
460                    connect, and expects to receive messages to each of
461                    them. */
462                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
463                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
464                                   (unsigned)msg_type, private_data));
465                         cb->fn = fn;
466                         cb->private_data = private_data;
467                         return NT_STATUS_OK;
468                 }
469         }
470
471         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
472                 return NT_STATUS_NO_MEMORY;
473         }
474
475         cb->msg_type = msg_type;
476         cb->fn = fn;
477         cb->private_data = private_data;
478
479         DLIST_ADD(msg_ctx->callbacks, cb);
480         return NT_STATUS_OK;
481 }
482
483 /*
484   De-register the function for a particular message type.
485 */
486 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
487                           void *private_data)
488 {
489         struct messaging_callback *cb, *next;
490
491         for (cb = ctx->callbacks; cb; cb = next) {
492                 next = cb->next;
493                 if ((cb->msg_type == msg_type)
494                     && (cb->private_data == private_data)) {
495                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
496                                   (unsigned)msg_type, private_data));
497                         DLIST_REMOVE(ctx->callbacks, cb);
498                         TALLOC_FREE(cb);
499                 }
500         }
501 }
502
503 /*
504   Send a message to a particular server
505 */
506 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
507                         struct server_id server, uint32_t msg_type,
508                         const DATA_BLOB *data)
509 {
510         struct iovec iov = {0};
511
512         if (data != NULL) {
513                 iov.iov_base = data->data;
514                 iov.iov_len = data->length;
515         };
516
517         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
518 }
519
520 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
521                             struct server_id server, uint32_t msg_type,
522                             const uint8_t *buf, size_t len)
523 {
524         DATA_BLOB blob = data_blob_const(buf, len);
525         return messaging_send(msg_ctx, server, msg_type, &blob);
526 }
527
528 struct messaging_post_state {
529         struct messaging_context *msg_ctx;
530         struct messaging_rec *rec;
531 };
532
533 static void messaging_post_handler(struct tevent_context *ev,
534                                    struct tevent_immediate *ti,
535                                    void *private_data);
536
537 static int messaging_post_self(struct messaging_context *msg_ctx,
538                                struct server_id src, struct server_id dst,
539                                uint32_t msg_type,
540                                const struct iovec *iov, int iovlen,
541                                const int *fds, size_t num_fds)
542 {
543         struct tevent_immediate *ti;
544         struct messaging_post_state *state;
545
546         state = talloc(msg_ctx, struct messaging_post_state);
547         if (state == NULL) {
548                 return ENOMEM;
549         }
550         state->msg_ctx = msg_ctx;
551
552         ti = tevent_create_immediate(state);
553         if (ti == NULL) {
554                 goto fail;
555         }
556         state->rec = messaging_rec_create(
557                 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
558         if (state->rec == NULL) {
559                 goto fail;
560         }
561
562         tevent_schedule_immediate(ti, msg_ctx->event_ctx,
563                                   messaging_post_handler, state);
564         return 0;
565
566 fail:
567         TALLOC_FREE(state);
568         return ENOMEM;
569 }
570
571 static void messaging_post_handler(struct tevent_context *ev,
572                                    struct tevent_immediate *ti,
573                                    void *private_data)
574 {
575         struct messaging_post_state *state = talloc_get_type_abort(
576                 private_data, struct messaging_post_state);
577         messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
578         TALLOC_FREE(state);
579 }
580
581 int messaging_send_iov_from(struct messaging_context *msg_ctx,
582                             struct server_id src, struct server_id dst,
583                             uint32_t msg_type,
584                             const struct iovec *iov, int iovlen,
585                             const int *fds, size_t num_fds)
586 {
587         int ret;
588         uint8_t hdr[MESSAGE_HDR_LENGTH];
589         struct iovec iov2[iovlen+1];
590
591         if (server_id_is_disconnected(&dst)) {
592                 return EINVAL;
593         }
594
595         if (num_fds > INT8_MAX) {
596                 return EINVAL;
597         }
598
599         if (dst.vnn != msg_ctx->id.vnn) {
600                 if (num_fds > 0) {
601                         return ENOSYS;
602                 }
603
604                 ret = msg_ctx->remote->send_fn(src, dst,
605                                                msg_type, iov, iovlen,
606                                                NULL, 0,
607                                                msg_ctx->remote);
608                 return ret;
609         }
610
611         if (server_id_equal(&dst, &msg_ctx->id)) {
612                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
613                                           iov, iovlen, fds, num_fds);
614                 return ret;
615         }
616
617         message_hdr_put(hdr, msg_type, src, dst);
618         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
619         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
620
621         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
622
623         if (ret == EACCES) {
624                 become_root();
625                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
626                                          fds, num_fds);
627                 unbecome_root();
628         }
629
630         if (ret == ECONNREFUSED) {
631                 /*
632                  * Linux returns this when a socket exists in the file
633                  * system without a listening process. This is not
634                  * documented in susv4 or the linux manpages, but it's
635                  * easily testable. For the higher levels this is the
636                  * same as "destination does not exist"
637                  */
638                 ret = ENOENT;
639         }
640
641         return ret;
642 }
643
644 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
645                             struct server_id dst, uint32_t msg_type,
646                             const struct iovec *iov, int iovlen,
647                             const int *fds, size_t num_fds)
648 {
649         int ret;
650
651         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
652                                       iov, iovlen, fds, num_fds);
653         if (ret != 0) {
654                 return map_nt_error_from_unix(ret);
655         }
656         return NT_STATUS_OK;
657 }
658
659 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
660                                                struct messaging_rec *rec)
661 {
662         struct messaging_rec *result;
663         size_t fds_size = sizeof(int64_t) * rec->num_fds;
664         size_t payload_len;
665
666         payload_len = rec->buf.length + fds_size;
667         if (payload_len < rec->buf.length) {
668                 /* overflow */
669                 return NULL;
670         }
671
672         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
673                                       payload_len);
674         if (result == NULL) {
675                 return NULL;
676         }
677         *result = *rec;
678
679         /* Doesn't fail, see talloc_pooled_object */
680
681         result->buf.data = talloc_memdup(result, rec->buf.data,
682                                          rec->buf.length);
683
684         result->fds = NULL;
685         if (result->num_fds > 0) {
686                 result->fds = talloc_memdup(result, rec->fds, fds_size);
687         }
688
689         return result;
690 }
691
692 struct messaging_filtered_read_state {
693         struct tevent_context *ev;
694         struct messaging_context *msg_ctx;
695         struct messaging_dgm_fde *fde;
696
697         bool (*filter)(struct messaging_rec *rec, void *private_data);
698         void *private_data;
699
700         struct messaging_rec *rec;
701 };
702
703 static void messaging_filtered_read_cleanup(struct tevent_req *req,
704                                             enum tevent_req_state req_state);
705
706 struct tevent_req *messaging_filtered_read_send(
707         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
708         struct messaging_context *msg_ctx,
709         bool (*filter)(struct messaging_rec *rec, void *private_data),
710         void *private_data)
711 {
712         struct tevent_req *req;
713         struct messaging_filtered_read_state *state;
714         size_t new_waiters_len;
715
716         req = tevent_req_create(mem_ctx, &state,
717                                 struct messaging_filtered_read_state);
718         if (req == NULL) {
719                 return NULL;
720         }
721         state->ev = ev;
722         state->msg_ctx = msg_ctx;
723         state->filter = filter;
724         state->private_data = private_data;
725
726         /*
727          * We have to defer the callback here, as we might be called from
728          * within a different tevent_context than state->ev
729          */
730         tevent_req_defer_callback(req, state->ev);
731
732         state->fde = messaging_dgm_register_tevent_context(state, ev);
733         if (tevent_req_nomem(state->fde, req)) {
734                 return tevent_req_post(req, ev);
735         }
736
737         /*
738          * We add ourselves to the "new_waiters" array, not the "waiters"
739          * array. If we are called from within messaging_read_done,
740          * messaging_dispatch_rec will be in an active for-loop on
741          * "waiters". We must be careful not to mess with this array, because
742          * it could mean that a single event is being delivered twice.
743          */
744
745         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
746
747         if (new_waiters_len == msg_ctx->num_new_waiters) {
748                 struct tevent_req **tmp;
749
750                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
751                                      struct tevent_req *, new_waiters_len+1);
752                 if (tevent_req_nomem(tmp, req)) {
753                         return tevent_req_post(req, ev);
754                 }
755                 msg_ctx->new_waiters = tmp;
756         }
757
758         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
759         msg_ctx->num_new_waiters += 1;
760         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
761
762         return req;
763 }
764
765 static void messaging_filtered_read_cleanup(struct tevent_req *req,
766                                             enum tevent_req_state req_state)
767 {
768         struct messaging_filtered_read_state *state = tevent_req_data(
769                 req, struct messaging_filtered_read_state);
770         struct messaging_context *msg_ctx = state->msg_ctx;
771         unsigned i;
772
773         tevent_req_set_cleanup_fn(req, NULL);
774
775         TALLOC_FREE(state->fde);
776
777         /*
778          * Just set the [new_]waiters entry to NULL, be careful not to mess
779          * with the other "waiters" array contents. We are often called from
780          * within "messaging_dispatch_rec", which loops over
781          * "waiters". Messing with the "waiters" array will mess up that
782          * for-loop.
783          */
784
785         for (i=0; i<msg_ctx->num_waiters; i++) {
786                 if (msg_ctx->waiters[i] == req) {
787                         msg_ctx->waiters[i] = NULL;
788                         return;
789                 }
790         }
791
792         for (i=0; i<msg_ctx->num_new_waiters; i++) {
793                 if (msg_ctx->new_waiters[i] == req) {
794                         msg_ctx->new_waiters[i] = NULL;
795                         return;
796                 }
797         }
798 }
799
800 static void messaging_filtered_read_done(struct tevent_req *req,
801                                          struct messaging_rec *rec)
802 {
803         struct messaging_filtered_read_state *state = tevent_req_data(
804                 req, struct messaging_filtered_read_state);
805
806         state->rec = messaging_rec_dup(state, rec);
807         if (tevent_req_nomem(state->rec, req)) {
808                 return;
809         }
810         tevent_req_done(req);
811 }
812
813 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
814                                  struct messaging_rec **presult)
815 {
816         struct messaging_filtered_read_state *state = tevent_req_data(
817                 req, struct messaging_filtered_read_state);
818         int err;
819
820         if (tevent_req_is_unix_error(req, &err)) {
821                 tevent_req_received(req);
822                 return err;
823         }
824         if (presult != NULL) {
825                 *presult = talloc_move(mem_ctx, &state->rec);
826         }
827         return 0;
828 }
829
830 struct messaging_read_state {
831         uint32_t msg_type;
832         struct messaging_rec *rec;
833 };
834
835 static bool messaging_read_filter(struct messaging_rec *rec,
836                                   void *private_data);
837 static void messaging_read_done(struct tevent_req *subreq);
838
839 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
840                                        struct tevent_context *ev,
841                                        struct messaging_context *msg,
842                                        uint32_t msg_type)
843 {
844         struct tevent_req *req, *subreq;
845         struct messaging_read_state *state;
846
847         req = tevent_req_create(mem_ctx, &state,
848                                 struct messaging_read_state);
849         if (req == NULL) {
850                 return NULL;
851         }
852         state->msg_type = msg_type;
853
854         subreq = messaging_filtered_read_send(state, ev, msg,
855                                               messaging_read_filter, state);
856         if (tevent_req_nomem(subreq, req)) {
857                 return tevent_req_post(req, ev);
858         }
859         tevent_req_set_callback(subreq, messaging_read_done, req);
860         return req;
861 }
862
863 static bool messaging_read_filter(struct messaging_rec *rec,
864                                   void *private_data)
865 {
866         struct messaging_read_state *state = talloc_get_type_abort(
867                 private_data, struct messaging_read_state);
868
869         if (rec->num_fds != 0) {
870                 return false;
871         }
872
873         return rec->msg_type == state->msg_type;
874 }
875
876 static void messaging_read_done(struct tevent_req *subreq)
877 {
878         struct tevent_req *req = tevent_req_callback_data(
879                 subreq, struct tevent_req);
880         struct messaging_read_state *state = tevent_req_data(
881                 req, struct messaging_read_state);
882         int ret;
883
884         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
885         TALLOC_FREE(subreq);
886         if (tevent_req_error(req, ret)) {
887                 return;
888         }
889         tevent_req_done(req);
890 }
891
892 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
893                         struct messaging_rec **presult)
894 {
895         struct messaging_read_state *state = tevent_req_data(
896                 req, struct messaging_read_state);
897         int err;
898
899         if (tevent_req_is_unix_error(req, &err)) {
900                 return err;
901         }
902         if (presult != NULL) {
903                 *presult = talloc_move(mem_ctx, &state->rec);
904         }
905         return 0;
906 }
907
908 struct messaging_handler_state {
909         struct tevent_context *ev;
910         struct messaging_context *msg_ctx;
911         uint32_t msg_type;
912         bool (*handler)(struct messaging_context *msg_ctx,
913                         struct messaging_rec **rec, void *private_data);
914         void *private_data;
915 };
916
917 static void messaging_handler_got_msg(struct tevent_req *subreq);
918
919 struct tevent_req *messaging_handler_send(
920         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
921         struct messaging_context *msg_ctx, uint32_t msg_type,
922         bool (*handler)(struct messaging_context *msg_ctx,
923                         struct messaging_rec **rec, void *private_data),
924         void *private_data)
925 {
926         struct tevent_req *req, *subreq;
927         struct messaging_handler_state *state;
928
929         req = tevent_req_create(mem_ctx, &state,
930                                 struct messaging_handler_state);
931         if (req == NULL) {
932                 return NULL;
933         }
934         state->ev = ev;
935         state->msg_ctx = msg_ctx;
936         state->msg_type = msg_type;
937         state->handler = handler;
938         state->private_data = private_data;
939
940         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
941                                      state->msg_type);
942         if (tevent_req_nomem(subreq, req)) {
943                 return tevent_req_post(req, ev);
944         }
945         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
946         return req;
947 }
948
949 static void messaging_handler_got_msg(struct tevent_req *subreq)
950 {
951         struct tevent_req *req = tevent_req_callback_data(
952                 subreq, struct tevent_req);
953         struct messaging_handler_state *state = tevent_req_data(
954                 req, struct messaging_handler_state);
955         struct messaging_rec *rec;
956         int ret;
957         bool ok;
958
959         ret = messaging_read_recv(subreq, state, &rec);
960         TALLOC_FREE(subreq);
961         if (tevent_req_error(req, ret)) {
962                 return;
963         }
964
965         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
966                                      state->msg_type);
967         if (tevent_req_nomem(subreq, req)) {
968                 return;
969         }
970         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
971
972         ok = state->handler(state->msg_ctx, &rec, state->private_data);
973         TALLOC_FREE(rec);
974         if (ok) {
975                 /*
976                  * Next round
977                  */
978                 return;
979         }
980         TALLOC_FREE(subreq);
981         tevent_req_done(req);
982 }
983
984 int messaging_handler_recv(struct tevent_req *req)
985 {
986         return tevent_req_simple_recv_unix(req);
987 }
988
989 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
990 {
991         if (msg_ctx->num_new_waiters == 0) {
992                 return true;
993         }
994
995         if (talloc_array_length(msg_ctx->waiters) <
996             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
997                 struct tevent_req **tmp;
998                 tmp = talloc_realloc(
999                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1000                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1001                 if (tmp == NULL) {
1002                         DEBUG(1, ("%s: talloc failed\n", __func__));
1003                         return false;
1004                 }
1005                 msg_ctx->waiters = tmp;
1006         }
1007
1008         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1009                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1010
1011         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1012         msg_ctx->num_new_waiters = 0;
1013
1014         return true;
1015 }
1016
1017 static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
1018                                        struct messaging_rec *rec)
1019 {
1020         struct messaging_callback *cb, *next;
1021
1022         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1023                 size_t j;
1024
1025                 next = cb->next;
1026                 if (cb->msg_type != rec->msg_type) {
1027                         continue;
1028                 }
1029
1030                 /*
1031                  * the old style callbacks don't support fd passing
1032                  */
1033                 for (j=0; j < rec->num_fds; j++) {
1034                         int fd = rec->fds[j];
1035                         close(fd);
1036                 }
1037                 rec->num_fds = 0;
1038                 rec->fds = NULL;
1039
1040                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1041                        rec->src, &rec->buf);
1042
1043                 /*
1044                  * we continue looking for matching messages after finding
1045                  * one. This matters for subsystems like the internal notify
1046                  * code which register more than one handler for the same
1047                  * message type
1048                  */
1049         }
1050 }
1051
1052 /*
1053   Dispatch one messaging_rec
1054 */
1055 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1056                                    struct tevent_context *ev,
1057                                    struct messaging_rec *rec)
1058 {
1059         unsigned i;
1060         size_t j;
1061
1062         if (ev == msg_ctx->event_ctx) {
1063                 messaging_dispatch_classic(msg_ctx, rec);
1064         }
1065
1066         if (!messaging_append_new_waiters(msg_ctx)) {
1067                 for (j=0; j < rec->num_fds; j++) {
1068                         int fd = rec->fds[j];
1069                         close(fd);
1070                 }
1071                 rec->num_fds = 0;
1072                 rec->fds = NULL;
1073                 return;
1074         }
1075
1076         i = 0;
1077         while (i < msg_ctx->num_waiters) {
1078                 struct tevent_req *req;
1079                 struct messaging_filtered_read_state *state;
1080
1081                 req = msg_ctx->waiters[i];
1082                 if (req == NULL) {
1083                         /*
1084                          * This got cleaned up. In the meantime,
1085                          * move everything down one. We need
1086                          * to keep the order of waiters, as
1087                          * other code may depend on this.
1088                          */
1089                         if (i < msg_ctx->num_waiters - 1) {
1090                                 memmove(&msg_ctx->waiters[i],
1091                                         &msg_ctx->waiters[i+1],
1092                                         sizeof(struct tevent_req *) *
1093                                             (msg_ctx->num_waiters - i - 1));
1094                         }
1095                         msg_ctx->num_waiters -= 1;
1096                         continue;
1097                 }
1098
1099                 state = tevent_req_data(
1100                         req, struct messaging_filtered_read_state);
1101                 if ((ev == state->ev) &&
1102                     state->filter(rec, state->private_data)) {
1103                         messaging_filtered_read_done(req, rec);
1104
1105                         /*
1106                          * Only the first one gets the fd-array
1107                          */
1108                         rec->num_fds = 0;
1109                         rec->fds = NULL;
1110                 }
1111
1112                 i += 1;
1113         }
1114
1115         if (ev != msg_ctx->event_ctx) {
1116                 struct iovec iov;
1117                 int fds[rec->num_fds];
1118                 int ret;
1119
1120                 /*
1121                  * We've been listening on a nested event
1122                  * context. Messages need to be handled in the main
1123                  * event context, so post to ourselves
1124                  */
1125
1126                 iov.iov_base = rec->buf.data;
1127                 iov.iov_len = rec->buf.length;
1128
1129                 for (i=0; i<rec->num_fds; i++) {
1130                         fds[i] = rec->fds[i];
1131                 }
1132
1133                 ret = messaging_post_self(
1134                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1135                         &iov, 1, fds, rec->num_fds);
1136                 if (ret == 0) {
1137                         return;
1138                 }
1139         }
1140
1141         /*
1142          * If the fd-array isn't used, just close it.
1143          */
1144         for (j=0; j < rec->num_fds; j++) {
1145                 int fd = rec->fds[j];
1146                 close(fd);
1147         }
1148         rec->num_fds = 0;
1149         rec->fds = NULL;
1150 }
1151
1152 static int mess_parent_dgm_cleanup(void *private_data);
1153 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1154
1155 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1156 {
1157         struct tevent_req *req;
1158
1159         req = background_job_send(
1160                 msg, msg->event_ctx, msg, NULL, 0,
1161                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1162                             60*15),
1163                 mess_parent_dgm_cleanup, msg);
1164         if (req == NULL) {
1165                 return false;
1166         }
1167         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1168         return true;
1169 }
1170
1171 static int mess_parent_dgm_cleanup(void *private_data)
1172 {
1173         int ret;
1174
1175         ret = messaging_dgm_wipe();
1176         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1177                    ret ? strerror(ret) : "ok"));
1178         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1179                            60*15);
1180 }
1181
1182 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1183 {
1184         struct messaging_context *msg = tevent_req_callback_data(
1185                 req, struct messaging_context);
1186         NTSTATUS status;
1187
1188         status = background_job_recv(req);
1189         TALLOC_FREE(req);
1190         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1191                   nt_errstr(status)));
1192
1193         req = background_job_send(
1194                 msg, msg->event_ctx, msg, NULL, 0,
1195                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1196                             60*15),
1197                 mess_parent_dgm_cleanup, msg);
1198         if (req == NULL) {
1199                 DEBUG(1, ("background_job_send failed\n"));
1200                 return;
1201         }
1202         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1203 }
1204
1205 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1206 {
1207         int ret;
1208
1209         if (pid == 0) {
1210                 ret = messaging_dgm_wipe();
1211         } else {
1212                 ret = messaging_dgm_cleanup(pid);
1213         }
1214
1215         return ret;
1216 }
1217
1218 struct tevent_context *messaging_tevent_context(
1219         struct messaging_context *msg_ctx)
1220 {
1221         return msg_ctx->event_ctx;
1222 }
1223
1224 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1225 {
1226         return msg_ctx->names_db;
1227 }
1228
1229 /** @} **/