messaging: Add an indirection for messaging_dgm_register_tevent_context
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
87                                                struct messaging_rec *rec);
88 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
89                                    struct tevent_context *ev,
90                                    struct messaging_rec *rec);
91
92 /****************************************************************************
93  A useful function for testing the message system.
94 ****************************************************************************/
95
96 static void ping_message(struct messaging_context *msg_ctx,
97                          void *private_data,
98                          uint32_t msg_type,
99                          struct server_id src,
100                          DATA_BLOB *data)
101 {
102         struct server_id_buf idbuf;
103
104         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
105                   server_id_str_buf(src, &idbuf), (int)data->length,
106                   data->data ? (char *)data->data : ""));
107
108         messaging_send(msg_ctx, src, MSG_PONG, data);
109 }
110
111 static struct messaging_rec *messaging_rec_create(
112         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
113         uint32_t msg_type, const struct iovec *iov, int iovlen,
114         const int *fds, size_t num_fds)
115 {
116         ssize_t buflen;
117         uint8_t *buf;
118         struct messaging_rec *result;
119
120         if (num_fds > INT8_MAX) {
121                 return NULL;
122         }
123
124         buflen = iov_buflen(iov, iovlen);
125         if (buflen == -1) {
126                 return NULL;
127         }
128         buf = talloc_array(mem_ctx, uint8_t, buflen);
129         if (buf == NULL) {
130                 return NULL;
131         }
132         iov_buf(iov, iovlen, buf, buflen);
133
134         {
135                 struct messaging_rec rec;
136                 int64_t fds64[num_fds];
137                 size_t i;
138
139                 for (i=0; i<num_fds; i++) {
140                         fds64[i] = fds[i];
141                 }
142
143                 rec = (struct messaging_rec) {
144                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
145                         .src = src, .dest = dst,
146                         .buf.data = buf, .buf.length = buflen,
147                         .num_fds = num_fds, .fds = fds64,
148                 };
149
150                 result = messaging_rec_dup(mem_ctx, &rec);
151         }
152
153         TALLOC_FREE(buf);
154
155         return result;
156 }
157
158 static void messaging_recv_cb(struct tevent_context *ev,
159                               const uint8_t *msg, size_t msg_len,
160                               int *fds, size_t num_fds,
161                               void *private_data)
162 {
163         struct messaging_context *msg_ctx = talloc_get_type_abort(
164                 private_data, struct messaging_context);
165         struct server_id_buf idbuf;
166         struct messaging_rec rec;
167         int64_t fds64[MIN(num_fds, INT8_MAX)];
168         size_t i;
169
170         if (msg_len < MESSAGE_HDR_LENGTH) {
171                 DBG_WARNING("message too short: %zu\n", msg_len);
172                 goto close_fail;
173         }
174
175         if (num_fds > INT8_MAX) {
176                 DBG_WARNING("too many fds: %zu\n", num_fds);
177                 goto close_fail;
178         }
179
180         /*
181          * "consume" the fds by copying them and setting
182          * the original variable to -1
183          */
184         for (i=0; i < num_fds; i++) {
185                 fds64[i] = fds[i];
186                 fds[i] = -1;
187         }
188
189         rec = (struct messaging_rec) {
190                 .msg_version = MESSAGE_VERSION,
191                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
192                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
193                 .num_fds = num_fds,
194                 .fds = fds64,
195         };
196
197         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
198
199         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
200                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
201                   server_id_str_buf(rec.src, &idbuf));
202
203         messaging_dispatch_rec(msg_ctx, ev, &rec);
204         return;
205
206 close_fail:
207         for (i=0; i < num_fds; i++) {
208                 close(fds[i]);
209         }
210 }
211
212 static int messaging_context_destructor(struct messaging_context *ctx)
213 {
214         unsigned i;
215
216         for (i=0; i<ctx->num_new_waiters; i++) {
217                 if (ctx->new_waiters[i] != NULL) {
218                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
219                         ctx->new_waiters[i] = NULL;
220                 }
221         }
222         for (i=0; i<ctx->num_waiters; i++) {
223                 if (ctx->waiters[i] != NULL) {
224                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
225                         ctx->waiters[i] = NULL;
226                 }
227         }
228
229         return 0;
230 }
231
232 static const char *private_path(const char *name)
233 {
234         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
235 }
236
237 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
238                                          struct tevent_context *ev)
239 {
240         struct messaging_context *ctx;
241         int ret;
242         const char *lck_path;
243         const char *priv_path;
244         bool ok;
245
246         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
247                 return NULL;
248         }
249
250         ctx->id = (struct server_id) {
251                 .pid = getpid(), .vnn = NONCLUSTER_VNN
252         };
253
254         ctx->event_ctx = ev;
255
256         sec_init();
257
258         lck_path = lock_path("msg.lock");
259         if (lck_path == NULL) {
260                 TALLOC_FREE(ctx);
261                 return NULL;
262         }
263
264         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
265                                               0755);
266         if (!ok) {
267                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
268                            __func__, strerror(errno)));
269                 TALLOC_FREE(ctx);
270                 return NULL;
271         }
272
273         priv_path = private_path("msg.sock");
274         if (priv_path == NULL) {
275                 TALLOC_FREE(ctx);
276                 return NULL;
277         }
278
279         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
280                                               0700);
281         if (!ok) {
282                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
283                            __func__, strerror(errno)));
284                 TALLOC_FREE(ctx);
285                 return NULL;
286         }
287
288         ctx->msg_dgm_ref = messaging_dgm_ref(
289                 ctx, ctx->event_ctx, &ctx->id.unique_id,
290                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
291
292         if (ctx->msg_dgm_ref == NULL) {
293                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
294                 TALLOC_FREE(ctx);
295                 return NULL;
296         }
297
298         talloc_set_destructor(ctx, messaging_context_destructor);
299
300         if (lp_clustering()) {
301                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
302
303                 if (ret != 0) {
304                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
305                                   strerror(ret)));
306                         TALLOC_FREE(ctx);
307                         return NULL;
308                 }
309         }
310         ctx->id.vnn = get_my_vnn();
311
312         ctx->names_db = server_id_db_init(
313                 ctx, ctx->id, lp_lock_directory(), 0,
314                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
315         if (ctx->names_db == NULL) {
316                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
317                 TALLOC_FREE(ctx);
318                 return NULL;
319         }
320
321         messaging_register(ctx, NULL, MSG_PING, ping_message);
322
323         /* Register some debugging related messages */
324
325         register_msg_pool_usage(ctx);
326         register_dmalloc_msgs(ctx);
327         debug_register_msgs(ctx);
328
329         {
330                 struct server_id_buf tmp;
331                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
332         }
333
334         return ctx;
335 }
336
337 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
338 {
339         return msg_ctx->id;
340 }
341
342 /*
343  * re-init after a fork
344  */
345 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
346 {
347         int ret;
348         char *lck_path;
349
350         TALLOC_FREE(msg_ctx->msg_dgm_ref);
351
352         msg_ctx->id = (struct server_id) {
353                 .pid = getpid(), .vnn = msg_ctx->id.vnn
354         };
355
356         lck_path = lock_path("msg.lock");
357         if (lck_path == NULL) {
358                 return NT_STATUS_NO_MEMORY;
359         }
360
361         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
362                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
363                 private_path("msg.sock"), lck_path,
364                 messaging_recv_cb, msg_ctx, &ret);
365
366         if (msg_ctx->msg_dgm_ref == NULL) {
367                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
368                 return map_nt_error_from_unix(ret);
369         }
370
371         if (lp_clustering()) {
372                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
373                                              msg_ctx->remote);
374
375                 if (ret != 0) {
376                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
377                                   strerror(ret)));
378                         return map_nt_error_from_unix(ret);
379                 }
380         }
381
382         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
383
384         return NT_STATUS_OK;
385 }
386
387
388 /*
389  * Register a dispatch function for a particular message type. Allow multiple
390  * registrants
391 */
392 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
393                             void *private_data,
394                             uint32_t msg_type,
395                             void (*fn)(struct messaging_context *msg,
396                                        void *private_data, 
397                                        uint32_t msg_type, 
398                                        struct server_id server_id,
399                                        DATA_BLOB *data))
400 {
401         struct messaging_callback *cb;
402
403         DEBUG(5, ("Registering messaging pointer for type %u - "
404                   "private_data=%p\n",
405                   (unsigned)msg_type, private_data));
406
407         /*
408          * Only one callback per type
409          */
410
411         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
412                 /* we allow a second registration of the same message
413                    type if it has a different private pointer. This is
414                    needed in, for example, the internal notify code,
415                    which creates a new notify context for each tree
416                    connect, and expects to receive messages to each of
417                    them. */
418                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
419                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
420                                   (unsigned)msg_type, private_data));
421                         cb->fn = fn;
422                         cb->private_data = private_data;
423                         return NT_STATUS_OK;
424                 }
425         }
426
427         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
428                 return NT_STATUS_NO_MEMORY;
429         }
430
431         cb->msg_type = msg_type;
432         cb->fn = fn;
433         cb->private_data = private_data;
434
435         DLIST_ADD(msg_ctx->callbacks, cb);
436         return NT_STATUS_OK;
437 }
438
439 /*
440   De-register the function for a particular message type.
441 */
442 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
443                           void *private_data)
444 {
445         struct messaging_callback *cb, *next;
446
447         for (cb = ctx->callbacks; cb; cb = next) {
448                 next = cb->next;
449                 if ((cb->msg_type == msg_type)
450                     && (cb->private_data == private_data)) {
451                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
452                                   (unsigned)msg_type, private_data));
453                         DLIST_REMOVE(ctx->callbacks, cb);
454                         TALLOC_FREE(cb);
455                 }
456         }
457 }
458
459 /*
460   Send a message to a particular server
461 */
462 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
463                         struct server_id server, uint32_t msg_type,
464                         const DATA_BLOB *data)
465 {
466         struct iovec iov = {0};
467
468         if (data != NULL) {
469                 iov.iov_base = data->data;
470                 iov.iov_len = data->length;
471         };
472
473         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
474 }
475
476 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
477                             struct server_id server, uint32_t msg_type,
478                             const uint8_t *buf, size_t len)
479 {
480         DATA_BLOB blob = data_blob_const(buf, len);
481         return messaging_send(msg_ctx, server, msg_type, &blob);
482 }
483
484 struct messaging_post_state {
485         struct messaging_context *msg_ctx;
486         struct messaging_rec *rec;
487 };
488
489 static void messaging_post_handler(struct tevent_context *ev,
490                                    struct tevent_immediate *ti,
491                                    void *private_data);
492
493 static int messaging_post_self(struct messaging_context *msg_ctx,
494                                struct server_id src, struct server_id dst,
495                                uint32_t msg_type,
496                                const struct iovec *iov, int iovlen,
497                                const int *fds, size_t num_fds)
498 {
499         struct tevent_immediate *ti;
500         struct messaging_post_state *state;
501
502         state = talloc(msg_ctx, struct messaging_post_state);
503         if (state == NULL) {
504                 return ENOMEM;
505         }
506         state->msg_ctx = msg_ctx;
507
508         ti = tevent_create_immediate(state);
509         if (ti == NULL) {
510                 goto fail;
511         }
512         state->rec = messaging_rec_create(
513                 state, src, dst, msg_type, iov, iovlen, fds, num_fds);
514         if (state->rec == NULL) {
515                 goto fail;
516         }
517
518         tevent_schedule_immediate(ti, msg_ctx->event_ctx,
519                                   messaging_post_handler, state);
520         return 0;
521
522 fail:
523         TALLOC_FREE(state);
524         return ENOMEM;
525 }
526
527 static void messaging_post_handler(struct tevent_context *ev,
528                                    struct tevent_immediate *ti,
529                                    void *private_data)
530 {
531         struct messaging_post_state *state = talloc_get_type_abort(
532                 private_data, struct messaging_post_state);
533         messaging_dispatch_rec(state->msg_ctx, ev, state->rec);
534         TALLOC_FREE(state);
535 }
536
537 int messaging_send_iov_from(struct messaging_context *msg_ctx,
538                             struct server_id src, struct server_id dst,
539                             uint32_t msg_type,
540                             const struct iovec *iov, int iovlen,
541                             const int *fds, size_t num_fds)
542 {
543         int ret;
544         uint8_t hdr[MESSAGE_HDR_LENGTH];
545         struct iovec iov2[iovlen+1];
546
547         if (server_id_is_disconnected(&dst)) {
548                 return EINVAL;
549         }
550
551         if (num_fds > INT8_MAX) {
552                 return EINVAL;
553         }
554
555         if (dst.vnn != msg_ctx->id.vnn) {
556                 if (num_fds > 0) {
557                         return ENOSYS;
558                 }
559
560                 ret = msg_ctx->remote->send_fn(src, dst,
561                                                msg_type, iov, iovlen,
562                                                NULL, 0,
563                                                msg_ctx->remote);
564                 return ret;
565         }
566
567         if (server_id_equal(&dst, &msg_ctx->id)) {
568                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
569                                           iov, iovlen, fds, num_fds);
570                 return ret;
571         }
572
573         message_hdr_put(hdr, msg_type, src, dst);
574         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
575         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
576
577         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
578
579         if (ret == EACCES) {
580                 become_root();
581                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
582                                          fds, num_fds);
583                 unbecome_root();
584         }
585
586         return ret;
587 }
588
589 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
590                             struct server_id dst, uint32_t msg_type,
591                             const struct iovec *iov, int iovlen,
592                             const int *fds, size_t num_fds)
593 {
594         int ret;
595
596         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
597                                       iov, iovlen, fds, num_fds);
598         if (ret != 0) {
599                 return map_nt_error_from_unix(ret);
600         }
601         return NT_STATUS_OK;
602 }
603
604 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
605                                                struct messaging_rec *rec)
606 {
607         struct messaging_rec *result;
608         size_t fds_size = sizeof(int64_t) * rec->num_fds;
609         size_t payload_len;
610
611         payload_len = rec->buf.length + fds_size;
612         if (payload_len < rec->buf.length) {
613                 /* overflow */
614                 return NULL;
615         }
616
617         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
618                                       payload_len);
619         if (result == NULL) {
620                 return NULL;
621         }
622         *result = *rec;
623
624         /* Doesn't fail, see talloc_pooled_object */
625
626         result->buf.data = talloc_memdup(result, rec->buf.data,
627                                          rec->buf.length);
628
629         result->fds = NULL;
630         if (result->num_fds > 0) {
631                 result->fds = talloc_memdup(result, rec->fds, fds_size);
632         }
633
634         return result;
635 }
636
637 struct messaging_filtered_read_state {
638         struct tevent_context *ev;
639         struct messaging_context *msg_ctx;
640         struct messaging_dgm_fde *fde;
641
642         bool (*filter)(struct messaging_rec *rec, void *private_data);
643         void *private_data;
644
645         struct messaging_rec *rec;
646 };
647
648 static void messaging_filtered_read_cleanup(struct tevent_req *req,
649                                             enum tevent_req_state req_state);
650
651 struct tevent_req *messaging_filtered_read_send(
652         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
653         struct messaging_context *msg_ctx,
654         bool (*filter)(struct messaging_rec *rec, void *private_data),
655         void *private_data)
656 {
657         struct tevent_req *req;
658         struct messaging_filtered_read_state *state;
659         size_t new_waiters_len;
660
661         req = tevent_req_create(mem_ctx, &state,
662                                 struct messaging_filtered_read_state);
663         if (req == NULL) {
664                 return NULL;
665         }
666         state->ev = ev;
667         state->msg_ctx = msg_ctx;
668         state->filter = filter;
669         state->private_data = private_data;
670
671         /*
672          * We have to defer the callback here, as we might be called from
673          * within a different tevent_context than state->ev
674          */
675         tevent_req_defer_callback(req, state->ev);
676
677         state->fde = messaging_dgm_register_tevent_context(state, ev);
678         if (tevent_req_nomem(state->fde, req)) {
679                 return tevent_req_post(req, ev);
680         }
681
682         /*
683          * We add ourselves to the "new_waiters" array, not the "waiters"
684          * array. If we are called from within messaging_read_done,
685          * messaging_dispatch_rec will be in an active for-loop on
686          * "waiters". We must be careful not to mess with this array, because
687          * it could mean that a single event is being delivered twice.
688          */
689
690         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
691
692         if (new_waiters_len == msg_ctx->num_new_waiters) {
693                 struct tevent_req **tmp;
694
695                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
696                                      struct tevent_req *, new_waiters_len+1);
697                 if (tevent_req_nomem(tmp, req)) {
698                         return tevent_req_post(req, ev);
699                 }
700                 msg_ctx->new_waiters = tmp;
701         }
702
703         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
704         msg_ctx->num_new_waiters += 1;
705         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
706
707         return req;
708 }
709
710 static void messaging_filtered_read_cleanup(struct tevent_req *req,
711                                             enum tevent_req_state req_state)
712 {
713         struct messaging_filtered_read_state *state = tevent_req_data(
714                 req, struct messaging_filtered_read_state);
715         struct messaging_context *msg_ctx = state->msg_ctx;
716         unsigned i;
717
718         tevent_req_set_cleanup_fn(req, NULL);
719
720         TALLOC_FREE(state->fde);
721
722         /*
723          * Just set the [new_]waiters entry to NULL, be careful not to mess
724          * with the other "waiters" array contents. We are often called from
725          * within "messaging_dispatch_rec", which loops over
726          * "waiters". Messing with the "waiters" array will mess up that
727          * for-loop.
728          */
729
730         for (i=0; i<msg_ctx->num_waiters; i++) {
731                 if (msg_ctx->waiters[i] == req) {
732                         msg_ctx->waiters[i] = NULL;
733                         return;
734                 }
735         }
736
737         for (i=0; i<msg_ctx->num_new_waiters; i++) {
738                 if (msg_ctx->new_waiters[i] == req) {
739                         msg_ctx->new_waiters[i] = NULL;
740                         return;
741                 }
742         }
743 }
744
745 static void messaging_filtered_read_done(struct tevent_req *req,
746                                          struct messaging_rec *rec)
747 {
748         struct messaging_filtered_read_state *state = tevent_req_data(
749                 req, struct messaging_filtered_read_state);
750
751         state->rec = messaging_rec_dup(state, rec);
752         if (tevent_req_nomem(state->rec, req)) {
753                 return;
754         }
755         tevent_req_done(req);
756 }
757
758 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
759                                  struct messaging_rec **presult)
760 {
761         struct messaging_filtered_read_state *state = tevent_req_data(
762                 req, struct messaging_filtered_read_state);
763         int err;
764
765         if (tevent_req_is_unix_error(req, &err)) {
766                 tevent_req_received(req);
767                 return err;
768         }
769         if (presult != NULL) {
770                 *presult = talloc_move(mem_ctx, &state->rec);
771         }
772         return 0;
773 }
774
775 struct messaging_read_state {
776         uint32_t msg_type;
777         struct messaging_rec *rec;
778 };
779
780 static bool messaging_read_filter(struct messaging_rec *rec,
781                                   void *private_data);
782 static void messaging_read_done(struct tevent_req *subreq);
783
784 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
785                                        struct tevent_context *ev,
786                                        struct messaging_context *msg,
787                                        uint32_t msg_type)
788 {
789         struct tevent_req *req, *subreq;
790         struct messaging_read_state *state;
791
792         req = tevent_req_create(mem_ctx, &state,
793                                 struct messaging_read_state);
794         if (req == NULL) {
795                 return NULL;
796         }
797         state->msg_type = msg_type;
798
799         subreq = messaging_filtered_read_send(state, ev, msg,
800                                               messaging_read_filter, state);
801         if (tevent_req_nomem(subreq, req)) {
802                 return tevent_req_post(req, ev);
803         }
804         tevent_req_set_callback(subreq, messaging_read_done, req);
805         return req;
806 }
807
808 static bool messaging_read_filter(struct messaging_rec *rec,
809                                   void *private_data)
810 {
811         struct messaging_read_state *state = talloc_get_type_abort(
812                 private_data, struct messaging_read_state);
813
814         if (rec->num_fds != 0) {
815                 return false;
816         }
817
818         return rec->msg_type == state->msg_type;
819 }
820
821 static void messaging_read_done(struct tevent_req *subreq)
822 {
823         struct tevent_req *req = tevent_req_callback_data(
824                 subreq, struct tevent_req);
825         struct messaging_read_state *state = tevent_req_data(
826                 req, struct messaging_read_state);
827         int ret;
828
829         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
830         TALLOC_FREE(subreq);
831         if (tevent_req_error(req, ret)) {
832                 return;
833         }
834         tevent_req_done(req);
835 }
836
837 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
838                         struct messaging_rec **presult)
839 {
840         struct messaging_read_state *state = tevent_req_data(
841                 req, struct messaging_read_state);
842         int err;
843
844         if (tevent_req_is_unix_error(req, &err)) {
845                 return err;
846         }
847         if (presult != NULL) {
848                 *presult = talloc_move(mem_ctx, &state->rec);
849         }
850         return 0;
851 }
852
853 struct messaging_handler_state {
854         struct tevent_context *ev;
855         struct messaging_context *msg_ctx;
856         uint32_t msg_type;
857         bool (*handler)(struct messaging_context *msg_ctx,
858                         struct messaging_rec **rec, void *private_data);
859         void *private_data;
860 };
861
862 static void messaging_handler_got_msg(struct tevent_req *subreq);
863
864 struct tevent_req *messaging_handler_send(
865         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
866         struct messaging_context *msg_ctx, uint32_t msg_type,
867         bool (*handler)(struct messaging_context *msg_ctx,
868                         struct messaging_rec **rec, void *private_data),
869         void *private_data)
870 {
871         struct tevent_req *req, *subreq;
872         struct messaging_handler_state *state;
873
874         req = tevent_req_create(mem_ctx, &state,
875                                 struct messaging_handler_state);
876         if (req == NULL) {
877                 return NULL;
878         }
879         state->ev = ev;
880         state->msg_ctx = msg_ctx;
881         state->msg_type = msg_type;
882         state->handler = handler;
883         state->private_data = private_data;
884
885         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
886                                      state->msg_type);
887         if (tevent_req_nomem(subreq, req)) {
888                 return tevent_req_post(req, ev);
889         }
890         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
891         return req;
892 }
893
894 static void messaging_handler_got_msg(struct tevent_req *subreq)
895 {
896         struct tevent_req *req = tevent_req_callback_data(
897                 subreq, struct tevent_req);
898         struct messaging_handler_state *state = tevent_req_data(
899                 req, struct messaging_handler_state);
900         struct messaging_rec *rec;
901         int ret;
902         bool ok;
903
904         ret = messaging_read_recv(subreq, state, &rec);
905         TALLOC_FREE(subreq);
906         if (tevent_req_error(req, ret)) {
907                 return;
908         }
909
910         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
911                                      state->msg_type);
912         if (tevent_req_nomem(subreq, req)) {
913                 return;
914         }
915         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
916
917         ok = state->handler(state->msg_ctx, &rec, state->private_data);
918         TALLOC_FREE(rec);
919         if (ok) {
920                 /*
921                  * Next round
922                  */
923                 return;
924         }
925         TALLOC_FREE(subreq);
926         tevent_req_done(req);
927 }
928
929 int messaging_handler_recv(struct tevent_req *req)
930 {
931         return tevent_req_simple_recv_unix(req);
932 }
933
934 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
935 {
936         if (msg_ctx->num_new_waiters == 0) {
937                 return true;
938         }
939
940         if (talloc_array_length(msg_ctx->waiters) <
941             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
942                 struct tevent_req **tmp;
943                 tmp = talloc_realloc(
944                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
945                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
946                 if (tmp == NULL) {
947                         DEBUG(1, ("%s: talloc failed\n", __func__));
948                         return false;
949                 }
950                 msg_ctx->waiters = tmp;
951         }
952
953         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
954                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
955
956         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
957         msg_ctx->num_new_waiters = 0;
958
959         return true;
960 }
961
962 static void messaging_dispatch_classic(struct messaging_context *msg_ctx,
963                                        struct messaging_rec *rec)
964 {
965         struct messaging_callback *cb, *next;
966
967         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
968                 size_t j;
969
970                 next = cb->next;
971                 if (cb->msg_type != rec->msg_type) {
972                         continue;
973                 }
974
975                 /*
976                  * the old style callbacks don't support fd passing
977                  */
978                 for (j=0; j < rec->num_fds; j++) {
979                         int fd = rec->fds[j];
980                         close(fd);
981                 }
982                 rec->num_fds = 0;
983                 rec->fds = NULL;
984
985                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
986                        rec->src, &rec->buf);
987
988                 /*
989                  * we continue looking for matching messages after finding
990                  * one. This matters for subsystems like the internal notify
991                  * code which register more than one handler for the same
992                  * message type
993                  */
994         }
995 }
996
997 /*
998   Dispatch one messaging_rec
999 */
1000 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1001                                    struct tevent_context *ev,
1002                                    struct messaging_rec *rec)
1003 {
1004         unsigned i;
1005         size_t j;
1006
1007         if (ev == msg_ctx->event_ctx) {
1008                 messaging_dispatch_classic(msg_ctx, rec);
1009         }
1010
1011         if (!messaging_append_new_waiters(msg_ctx)) {
1012                 for (j=0; j < rec->num_fds; j++) {
1013                         int fd = rec->fds[j];
1014                         close(fd);
1015                 }
1016                 rec->num_fds = 0;
1017                 rec->fds = NULL;
1018                 return;
1019         }
1020
1021         i = 0;
1022         while (i < msg_ctx->num_waiters) {
1023                 struct tevent_req *req;
1024                 struct messaging_filtered_read_state *state;
1025
1026                 req = msg_ctx->waiters[i];
1027                 if (req == NULL) {
1028                         /*
1029                          * This got cleaned up. In the meantime,
1030                          * move everything down one. We need
1031                          * to keep the order of waiters, as
1032                          * other code may depend on this.
1033                          */
1034                         if (i < msg_ctx->num_waiters - 1) {
1035                                 memmove(&msg_ctx->waiters[i],
1036                                         &msg_ctx->waiters[i+1],
1037                                         sizeof(struct tevent_req *) *
1038                                             (msg_ctx->num_waiters - i - 1));
1039                         }
1040                         msg_ctx->num_waiters -= 1;
1041                         continue;
1042                 }
1043
1044                 state = tevent_req_data(
1045                         req, struct messaging_filtered_read_state);
1046                 if ((ev == state->ev) &&
1047                     state->filter(rec, state->private_data)) {
1048                         messaging_filtered_read_done(req, rec);
1049
1050                         /*
1051                          * Only the first one gets the fd-array
1052                          */
1053                         rec->num_fds = 0;
1054                         rec->fds = NULL;
1055                 }
1056
1057                 i += 1;
1058         }
1059
1060         if (ev != msg_ctx->event_ctx) {
1061                 struct iovec iov;
1062                 int fds[rec->num_fds];
1063                 int ret;
1064
1065                 /*
1066                  * We've been listening on a nested event
1067                  * context. Messages need to be handled in the main
1068                  * event context, so post to ourselves
1069                  */
1070
1071                 iov.iov_base = rec->buf.data;
1072                 iov.iov_len = rec->buf.length;
1073
1074                 for (i=0; i<rec->num_fds; i++) {
1075                         fds[i] = rec->fds[i];
1076                 }
1077
1078                 ret = messaging_post_self(
1079                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1080                         &iov, 1, fds, rec->num_fds);
1081                 if (ret == 0) {
1082                         return;
1083                 }
1084         }
1085
1086         /*
1087          * If the fd-array isn't used, just close it.
1088          */
1089         for (j=0; j < rec->num_fds; j++) {
1090                 int fd = rec->fds[j];
1091                 close(fd);
1092         }
1093         rec->num_fds = 0;
1094         rec->fds = NULL;
1095 }
1096
1097 static int mess_parent_dgm_cleanup(void *private_data);
1098 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1099
1100 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1101 {
1102         struct tevent_req *req;
1103
1104         req = background_job_send(
1105                 msg, msg->event_ctx, msg, NULL, 0,
1106                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1107                             60*15),
1108                 mess_parent_dgm_cleanup, msg);
1109         if (req == NULL) {
1110                 return false;
1111         }
1112         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1113         return true;
1114 }
1115
1116 static int mess_parent_dgm_cleanup(void *private_data)
1117 {
1118         int ret;
1119
1120         ret = messaging_dgm_wipe();
1121         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1122                    ret ? strerror(ret) : "ok"));
1123         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1124                            60*15);
1125 }
1126
1127 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1128 {
1129         struct messaging_context *msg = tevent_req_callback_data(
1130                 req, struct messaging_context);
1131         NTSTATUS status;
1132
1133         status = background_job_recv(req);
1134         TALLOC_FREE(req);
1135         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1136                   nt_errstr(status)));
1137
1138         req = background_job_send(
1139                 msg, msg->event_ctx, msg, NULL, 0,
1140                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1141                             60*15),
1142                 mess_parent_dgm_cleanup, msg);
1143         if (req == NULL) {
1144                 DEBUG(1, ("background_job_send failed\n"));
1145                 return;
1146         }
1147         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1148 }
1149
1150 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1151 {
1152         int ret;
1153
1154         if (pid == 0) {
1155                 ret = messaging_dgm_wipe();
1156         } else {
1157                 ret = messaging_dgm_cleanup(pid);
1158         }
1159
1160         return ret;
1161 }
1162
1163 struct tevent_context *messaging_tevent_context(
1164         struct messaging_context *msg_ctx)
1165 {
1166         return msg_ctx->event_ctx;
1167 }
1168
1169 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1170 {
1171         return msg_ctx->names_db;
1172 }
1173
1174 /** @} **/