s3:messaging: add fds-array to message-backend send function
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55
56 struct messaging_callback {
57         struct messaging_callback *prev, *next;
58         uint32 msg_type;
59         void (*fn)(struct messaging_context *msg, void *private_data, 
60                    uint32_t msg_type, 
61                    struct server_id server_id, DATA_BLOB *data);
62         void *private_data;
63 };
64
65 struct messaging_context {
66         struct server_id id;
67         struct tevent_context *event_ctx;
68         struct messaging_callback *callbacks;
69
70         struct tevent_req **new_waiters;
71         unsigned num_new_waiters;
72
73         struct tevent_req **waiters;
74         unsigned num_waiters;
75
76         struct messaging_backend *remote;
77 };
78
79 struct messaging_hdr {
80         int msg_type;
81         struct server_id dst;
82         struct server_id src;
83 };
84
85 /****************************************************************************
86  A useful function for testing the message system.
87 ****************************************************************************/
88
89 static void ping_message(struct messaging_context *msg_ctx,
90                          void *private_data,
91                          uint32_t msg_type,
92                          struct server_id src,
93                          DATA_BLOB *data)
94 {
95         struct server_id_buf idbuf;
96
97         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
98                   server_id_str_buf(src, &idbuf), (int)data->length,
99                   data->data ? (char *)data->data : ""));
100
101         messaging_send(msg_ctx, src, MSG_PONG, data);
102 }
103
104 /****************************************************************************
105  Register/replace a dispatch function for a particular message type.
106  JRA changed Dec 13 2006. Only one message handler now permitted per type.
107  *NOTE*: Dispatch functions must be able to cope with incoming
108  messages on an *odd* byte boundary.
109 ****************************************************************************/
110
111 struct msg_all {
112         struct messaging_context *msg_ctx;
113         int msg_type;
114         uint32 msg_flag;
115         const void *buf;
116         size_t len;
117         int n_sent;
118 };
119
120 /****************************************************************************
121  Send one of the messages for the broadcast.
122 ****************************************************************************/
123
124 static int traverse_fn(struct db_record *rec, const struct server_id *id,
125                        uint32_t msg_flags, void *state)
126 {
127         struct msg_all *msg_all = (struct msg_all *)state;
128         NTSTATUS status;
129
130         /* Don't send if the receiver hasn't registered an interest. */
131
132         if((msg_flags & msg_all->msg_flag) == 0) {
133                 return 0;
134         }
135
136         /* If the msg send fails because the pid was not found (i.e. smbd died), 
137          * the msg has already been deleted from the messages.tdb.*/
138
139         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
140                                     (const uint8_t *)msg_all->buf, msg_all->len);
141
142         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
143                 struct server_id_buf idbuf;
144
145                 /*
146                  * If the pid was not found delete the entry from
147                  * serverid.tdb
148                  */
149
150                 DEBUG(2, ("pid %s doesn't exist\n",
151                           server_id_str_buf(*id, &idbuf)));
152
153                 dbwrap_record_delete(rec);
154         }
155         msg_all->n_sent++;
156         return 0;
157 }
158
159 /**
160  * Send a message to all smbd processes.
161  *
162  * It isn't very efficient, but should be OK for the sorts of
163  * applications that use it. When we need efficient broadcast we can add
164  * it.
165  *
166  * @param n_sent Set to the number of messages sent.  This should be
167  * equal to the number of processes, but be careful for races.
168  *
169  * @retval True for success.
170  **/
171 bool message_send_all(struct messaging_context *msg_ctx,
172                       int msg_type,
173                       const void *buf, size_t len,
174                       int *n_sent)
175 {
176         struct msg_all msg_all;
177
178         msg_all.msg_type = msg_type;
179         if (msg_type < 0x100) {
180                 msg_all.msg_flag = FLAG_MSG_GENERAL;
181         } else if (msg_type > 0x100 && msg_type < 0x200) {
182                 msg_all.msg_flag = FLAG_MSG_NMBD;
183         } else if (msg_type > 0x200 && msg_type < 0x300) {
184                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
185         } else if (msg_type > 0x300 && msg_type < 0x400) {
186                 msg_all.msg_flag = FLAG_MSG_SMBD;
187         } else if (msg_type > 0x400 && msg_type < 0x600) {
188                 msg_all.msg_flag = FLAG_MSG_WINBIND;
189         } else if (msg_type > 4000 && msg_type < 5000) {
190                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
191         } else {
192                 return false;
193         }
194
195         msg_all.buf = buf;
196         msg_all.len = len;
197         msg_all.n_sent = 0;
198         msg_all.msg_ctx = msg_ctx;
199
200         serverid_traverse(traverse_fn, &msg_all);
201         if (n_sent)
202                 *n_sent = msg_all.n_sent;
203         return true;
204 }
205
206 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
207                               const int *fds, size_t num_fds,
208                               void *private_data)
209 {
210         struct messaging_context *msg_ctx = talloc_get_type_abort(
211                 private_data, struct messaging_context);
212         const struct messaging_hdr *hdr;
213         struct server_id_buf idbuf;
214         struct messaging_rec rec;
215         int64_t fds64[MIN(num_fds, INT8_MAX)];
216         size_t i;
217
218         if (msg_len < sizeof(*hdr)) {
219                 for (i=0; i < num_fds; i++) {
220                         close(fds[i]);
221                 }
222                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
223                 return;
224         }
225
226         if (num_fds > INT8_MAX) {
227                 for (i=0; i < num_fds; i++) {
228                         close(fds[i]);
229                 }
230                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
231                 return;
232         }
233
234         for (i=0; i < num_fds; i++) {
235                 fds64[i] = fds[i];
236         }
237
238         /*
239          * messages_dgm guarantees alignment, so we can cast here
240          */
241         hdr = (const struct messaging_hdr *)msg;
242
243         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
244                    __func__, (unsigned)hdr->msg_type,
245                    (unsigned)(msg_len - sizeof(*hdr)),
246                    (unsigned)num_fds,
247                    server_id_str_buf(hdr->src, &idbuf)));
248
249         rec = (struct messaging_rec) {
250                 .msg_version = MESSAGE_VERSION,
251                 .msg_type = hdr->msg_type,
252                 .src = hdr->src,
253                 .dest = hdr->dst,
254                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
255                 .buf.length = msg_len - sizeof(*hdr),
256                 .num_fds = num_fds,
257                 .fds = fds64,
258         };
259
260         messaging_dispatch_rec(msg_ctx, &rec);
261 }
262
263 static int messaging_context_destructor(struct messaging_context *ctx)
264 {
265         messaging_dgm_destroy();
266         return 0;
267 }
268
269 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
270                                          struct tevent_context *ev)
271 {
272         struct messaging_context *ctx;
273         NTSTATUS status;
274         int ret;
275
276         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
277                 return NULL;
278         }
279
280         ctx->id = procid_self();
281         ctx->event_ctx = ev;
282
283         sec_init();
284
285         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
286                                  lp_cache_directory(), sec_initial_uid(),
287                                  messaging_recv_cb, ctx);
288
289         if (ret != 0) {
290                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
291                 TALLOC_FREE(ctx);
292                 return NULL;
293         }
294
295         talloc_set_destructor(ctx, messaging_context_destructor);
296
297         if (lp_clustering()) {
298                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
299
300                 if (!NT_STATUS_IS_OK(status)) {
301                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
302                                   nt_errstr(status)));
303                         TALLOC_FREE(ctx);
304                         return NULL;
305                 }
306         }
307         ctx->id.vnn = get_my_vnn();
308
309         messaging_register(ctx, NULL, MSG_PING, ping_message);
310
311         /* Register some debugging related messages */
312
313         register_msg_pool_usage(ctx);
314         register_dmalloc_msgs(ctx);
315         debug_register_msgs(ctx);
316
317         return ctx;
318 }
319
320 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
321 {
322         return msg_ctx->id;
323 }
324
325 /*
326  * re-init after a fork
327  */
328 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
329 {
330         NTSTATUS status;
331         int ret;
332
333         messaging_dgm_destroy();
334
335         msg_ctx->id = procid_self();
336
337         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
338                                  lp_cache_directory(), sec_initial_uid(),
339                                  messaging_recv_cb, msg_ctx);
340         if (ret != 0) {
341                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
342                 return map_nt_error_from_unix(ret);
343         }
344
345         TALLOC_FREE(msg_ctx->remote);
346
347         if (lp_clustering()) {
348                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
349                                               &msg_ctx->remote);
350
351                 if (!NT_STATUS_IS_OK(status)) {
352                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
353                                   nt_errstr(status)));
354                         return status;
355                 }
356         }
357
358         return NT_STATUS_OK;
359 }
360
361
362 /*
363  * Register a dispatch function for a particular message type. Allow multiple
364  * registrants
365 */
366 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
367                             void *private_data,
368                             uint32_t msg_type,
369                             void (*fn)(struct messaging_context *msg,
370                                        void *private_data, 
371                                        uint32_t msg_type, 
372                                        struct server_id server_id,
373                                        DATA_BLOB *data))
374 {
375         struct messaging_callback *cb;
376
377         DEBUG(5, ("Registering messaging pointer for type %u - "
378                   "private_data=%p\n",
379                   (unsigned)msg_type, private_data));
380
381         /*
382          * Only one callback per type
383          */
384
385         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
386                 /* we allow a second registration of the same message
387                    type if it has a different private pointer. This is
388                    needed in, for example, the internal notify code,
389                    which creates a new notify context for each tree
390                    connect, and expects to receive messages to each of
391                    them. */
392                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
393                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
394                                   (unsigned)msg_type, private_data));
395                         cb->fn = fn;
396                         cb->private_data = private_data;
397                         return NT_STATUS_OK;
398                 }
399         }
400
401         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
402                 return NT_STATUS_NO_MEMORY;
403         }
404
405         cb->msg_type = msg_type;
406         cb->fn = fn;
407         cb->private_data = private_data;
408
409         DLIST_ADD(msg_ctx->callbacks, cb);
410         return NT_STATUS_OK;
411 }
412
413 /*
414   De-register the function for a particular message type.
415 */
416 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
417                           void *private_data)
418 {
419         struct messaging_callback *cb, *next;
420
421         for (cb = ctx->callbacks; cb; cb = next) {
422                 next = cb->next;
423                 if ((cb->msg_type == msg_type)
424                     && (cb->private_data == private_data)) {
425                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
426                                   (unsigned)msg_type, private_data));
427                         DLIST_REMOVE(ctx->callbacks, cb);
428                         TALLOC_FREE(cb);
429                 }
430         }
431 }
432
433 /*
434   Send a message to a particular server
435 */
436 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
437                         struct server_id server, uint32_t msg_type,
438                         const DATA_BLOB *data)
439 {
440         struct iovec iov;
441
442         iov.iov_base = data->data;
443         iov.iov_len = data->length;
444
445         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
446 }
447
448 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
449                             struct server_id server, uint32_t msg_type,
450                             const uint8_t *buf, size_t len)
451 {
452         DATA_BLOB blob = data_blob_const(buf, len);
453         return messaging_send(msg_ctx, server, msg_type, &blob);
454 }
455
456 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
457                             struct server_id server, uint32_t msg_type,
458                             const struct iovec *iov, int iovlen)
459 {
460         int ret;
461         struct messaging_hdr hdr;
462         struct iovec iov2[iovlen+1];
463
464         if (server_id_is_disconnected(&server)) {
465                 return NT_STATUS_INVALID_PARAMETER_MIX;
466         }
467
468         if (!procid_is_local(&server)) {
469                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
470                                                msg_type, iov, iovlen,
471                                                NULL, 0,
472                                                msg_ctx->remote);
473                 if (ret != 0) {
474                         return map_nt_error_from_unix(ret);
475                 }
476                 return NT_STATUS_OK;
477         }
478
479         if (server_id_same_process(&msg_ctx->id, &server)) {
480                 struct messaging_rec rec;
481                 uint8_t *buf;
482
483                 /*
484                  * Self-send, directly dispatch
485                  */
486
487                 buf = iov_buf(talloc_tos(), iov, iovlen);
488                 if (buf == NULL) {
489                         return NT_STATUS_NO_MEMORY;
490                 }
491
492                 rec = (struct messaging_rec) {
493                         .msg_version = MESSAGE_VERSION,
494                         .msg_type = msg_type & MSG_TYPE_MASK,
495                         .dest = server,
496                         .src = msg_ctx->id,
497                         .buf = data_blob_const(buf, talloc_get_size(buf)),
498                 };
499
500                 messaging_dispatch_rec(msg_ctx, &rec);
501                 TALLOC_FREE(buf);
502                 return NT_STATUS_OK;
503         }
504
505         hdr = (struct messaging_hdr) {
506                 .msg_type = msg_type,
507                 .dst = server,
508                 .src = msg_ctx->id
509         };
510         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
511         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
512
513         become_root();
514         ret = messaging_dgm_send(server.pid, iov2, iovlen+1, NULL, 0);
515         unbecome_root();
516
517         if (ret != 0) {
518                 return map_nt_error_from_unix(ret);
519         }
520         return NT_STATUS_OK;
521 }
522
523 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
524                                                struct messaging_rec *rec)
525 {
526         struct messaging_rec *result;
527         size_t fds_size = sizeof(int64_t) * rec->num_fds;
528
529         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
530                                       rec->buf.length + fds_size);
531         if (result == NULL) {
532                 return NULL;
533         }
534         *result = *rec;
535
536         /* Doesn't fail, see talloc_pooled_object */
537
538         result->buf.data = talloc_memdup(result, rec->buf.data,
539                                          rec->buf.length);
540
541         result->fds = NULL;
542         if (result->num_fds > 0) {
543                 result->fds = talloc_array(result, int64_t, result->num_fds);
544                 memcpy(result->fds, rec->fds, fds_size);
545         }
546
547         return result;
548 }
549
550 struct messaging_filtered_read_state {
551         struct tevent_context *ev;
552         struct messaging_context *msg_ctx;
553         void *tevent_handle;
554
555         bool (*filter)(struct messaging_rec *rec, void *private_data);
556         void *private_data;
557
558         struct messaging_rec *rec;
559 };
560
561 static void messaging_filtered_read_cleanup(struct tevent_req *req,
562                                             enum tevent_req_state req_state);
563
564 struct tevent_req *messaging_filtered_read_send(
565         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
566         struct messaging_context *msg_ctx,
567         bool (*filter)(struct messaging_rec *rec, void *private_data),
568         void *private_data)
569 {
570         struct tevent_req *req;
571         struct messaging_filtered_read_state *state;
572         size_t new_waiters_len;
573
574         req = tevent_req_create(mem_ctx, &state,
575                                 struct messaging_filtered_read_state);
576         if (req == NULL) {
577                 return NULL;
578         }
579         state->ev = ev;
580         state->msg_ctx = msg_ctx;
581         state->filter = filter;
582         state->private_data = private_data;
583
584         /*
585          * We have to defer the callback here, as we might be called from
586          * within a different tevent_context than state->ev
587          */
588         tevent_req_defer_callback(req, state->ev);
589
590         state->tevent_handle = messaging_dgm_register_tevent_context(
591                 state, ev);
592         if (tevent_req_nomem(state, req)) {
593                 return tevent_req_post(req, ev);
594         }
595
596         /*
597          * We add ourselves to the "new_waiters" array, not the "waiters"
598          * array. If we are called from within messaging_read_done,
599          * messaging_dispatch_rec will be in an active for-loop on
600          * "waiters". We must be careful not to mess with this array, because
601          * it could mean that a single event is being delivered twice.
602          */
603
604         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
605
606         if (new_waiters_len == msg_ctx->num_new_waiters) {
607                 struct tevent_req **tmp;
608
609                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
610                                      struct tevent_req *, new_waiters_len+1);
611                 if (tevent_req_nomem(tmp, req)) {
612                         return tevent_req_post(req, ev);
613                 }
614                 msg_ctx->new_waiters = tmp;
615         }
616
617         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
618         msg_ctx->num_new_waiters += 1;
619         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
620
621         return req;
622 }
623
624 static void messaging_filtered_read_cleanup(struct tevent_req *req,
625                                             enum tevent_req_state req_state)
626 {
627         struct messaging_filtered_read_state *state = tevent_req_data(
628                 req, struct messaging_filtered_read_state);
629         struct messaging_context *msg_ctx = state->msg_ctx;
630         unsigned i;
631
632         tevent_req_set_cleanup_fn(req, NULL);
633
634         TALLOC_FREE(state->tevent_handle);
635
636         /*
637          * Just set the [new_]waiters entry to NULL, be careful not to mess
638          * with the other "waiters" array contents. We are often called from
639          * within "messaging_dispatch_rec", which loops over
640          * "waiters". Messing with the "waiters" array will mess up that
641          * for-loop.
642          */
643
644         for (i=0; i<msg_ctx->num_waiters; i++) {
645                 if (msg_ctx->waiters[i] == req) {
646                         msg_ctx->waiters[i] = NULL;
647                         return;
648                 }
649         }
650
651         for (i=0; i<msg_ctx->num_new_waiters; i++) {
652                 if (msg_ctx->new_waiters[i] == req) {
653                         msg_ctx->new_waiters[i] = NULL;
654                         return;
655                 }
656         }
657 }
658
659 static void messaging_filtered_read_done(struct tevent_req *req,
660                                          struct messaging_rec *rec)
661 {
662         struct messaging_filtered_read_state *state = tevent_req_data(
663                 req, struct messaging_filtered_read_state);
664
665         state->rec = messaging_rec_dup(state, rec);
666         if (tevent_req_nomem(state->rec, req)) {
667                 return;
668         }
669         tevent_req_done(req);
670 }
671
672 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
673                                  struct messaging_rec **presult)
674 {
675         struct messaging_filtered_read_state *state = tevent_req_data(
676                 req, struct messaging_filtered_read_state);
677         int err;
678
679         if (tevent_req_is_unix_error(req, &err)) {
680                 tevent_req_received(req);
681                 return err;
682         }
683         *presult = talloc_move(mem_ctx, &state->rec);
684         return 0;
685 }
686
687 struct messaging_read_state {
688         uint32_t msg_type;
689         struct messaging_rec *rec;
690 };
691
692 static bool messaging_read_filter(struct messaging_rec *rec,
693                                   void *private_data);
694 static void messaging_read_done(struct tevent_req *subreq);
695
696 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
697                                        struct tevent_context *ev,
698                                        struct messaging_context *msg,
699                                        uint32_t msg_type)
700 {
701         struct tevent_req *req, *subreq;
702         struct messaging_read_state *state;
703
704         req = tevent_req_create(mem_ctx, &state,
705                                 struct messaging_read_state);
706         if (req == NULL) {
707                 return NULL;
708         }
709         state->msg_type = msg_type;
710
711         subreq = messaging_filtered_read_send(state, ev, msg,
712                                               messaging_read_filter, state);
713         if (tevent_req_nomem(subreq, req)) {
714                 return tevent_req_post(req, ev);
715         }
716         tevent_req_set_callback(subreq, messaging_read_done, req);
717         return req;
718 }
719
720 static bool messaging_read_filter(struct messaging_rec *rec,
721                                   void *private_data)
722 {
723         struct messaging_read_state *state = talloc_get_type_abort(
724                 private_data, struct messaging_read_state);
725
726         if (rec->num_fds != 0) {
727                 return false;
728         }
729
730         return rec->msg_type == state->msg_type;
731 }
732
733 static void messaging_read_done(struct tevent_req *subreq)
734 {
735         struct tevent_req *req = tevent_req_callback_data(
736                 subreq, struct tevent_req);
737         struct messaging_read_state *state = tevent_req_data(
738                 req, struct messaging_read_state);
739         int ret;
740
741         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
742         TALLOC_FREE(subreq);
743         if (tevent_req_error(req, ret)) {
744                 return;
745         }
746         tevent_req_done(req);
747 }
748
749 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
750                         struct messaging_rec **presult)
751 {
752         struct messaging_read_state *state = tevent_req_data(
753                 req, struct messaging_read_state);
754         int err;
755
756         if (tevent_req_is_unix_error(req, &err)) {
757                 return err;
758         }
759         if (presult != NULL) {
760                 *presult = talloc_move(mem_ctx, &state->rec);
761         }
762         return 0;
763 }
764
765 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
766 {
767         if (msg_ctx->num_new_waiters == 0) {
768                 return true;
769         }
770
771         if (talloc_array_length(msg_ctx->waiters) <
772             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
773                 struct tevent_req **tmp;
774                 tmp = talloc_realloc(
775                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
776                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
777                 if (tmp == NULL) {
778                         DEBUG(1, ("%s: talloc failed\n", __func__));
779                         return false;
780                 }
781                 msg_ctx->waiters = tmp;
782         }
783
784         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
785                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
786
787         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
788         msg_ctx->num_new_waiters = 0;
789
790         return true;
791 }
792
793 struct messaging_defer_callback_state {
794         struct messaging_context *msg_ctx;
795         struct messaging_rec *rec;
796         void (*fn)(struct messaging_context *msg, void *private_data,
797                    uint32_t msg_type, struct server_id server_id,
798                    DATA_BLOB *data);
799         void *private_data;
800 };
801
802 static void messaging_defer_callback_trigger(struct tevent_context *ev,
803                                              struct tevent_immediate *im,
804                                              void *private_data);
805
806 static void messaging_defer_callback(
807         struct messaging_context *msg_ctx, struct messaging_rec *rec,
808         void (*fn)(struct messaging_context *msg, void *private_data,
809                    uint32_t msg_type, struct server_id server_id,
810                    DATA_BLOB *data),
811         void *private_data)
812 {
813         struct messaging_defer_callback_state *state;
814         struct tevent_immediate *im;
815
816         state = talloc(msg_ctx, struct messaging_defer_callback_state);
817         if (state == NULL) {
818                 DEBUG(1, ("talloc failed\n"));
819                 return;
820         }
821         state->msg_ctx = msg_ctx;
822         state->fn = fn;
823         state->private_data = private_data;
824
825         state->rec = messaging_rec_dup(state, rec);
826         if (state->rec == NULL) {
827                 DEBUG(1, ("talloc failed\n"));
828                 TALLOC_FREE(state);
829                 return;
830         }
831
832         im = tevent_create_immediate(state);
833         if (im == NULL) {
834                 DEBUG(1, ("tevent_create_immediate failed\n"));
835                 TALLOC_FREE(state);
836                 return;
837         }
838         tevent_schedule_immediate(im, msg_ctx->event_ctx,
839                                   messaging_defer_callback_trigger, state);
840 }
841
842 static void messaging_defer_callback_trigger(struct tevent_context *ev,
843                                              struct tevent_immediate *im,
844                                              void *private_data)
845 {
846         struct messaging_defer_callback_state *state = talloc_get_type_abort(
847                 private_data, struct messaging_defer_callback_state);
848         struct messaging_rec *rec = state->rec;
849
850         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
851                   &rec->buf);
852         TALLOC_FREE(state);
853 }
854
855 /*
856   Dispatch one messaging_rec
857 */
858 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
859                             struct messaging_rec *rec)
860 {
861         struct messaging_callback *cb, *next;
862         unsigned i;
863         size_t j;
864
865         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
866                 next = cb->next;
867                 if (cb->msg_type != rec->msg_type) {
868                         continue;
869                 }
870
871                 /*
872                  * the old style callbacks don't support fd passing
873                  */
874                 for (j=0; j < rec->num_fds; j++) {
875                         int fd = rec->fds[j];
876                         close(fd);
877                 }
878                 rec->num_fds = 0;
879                 rec->fds = NULL;
880
881                 if (server_id_same_process(&rec->src, &rec->dest)) {
882                         /*
883                          * This is a self-send. We are called here from
884                          * messaging_send(), and we don't want to directly
885                          * recurse into the callback but go via a
886                          * tevent_loop_once
887                          */
888                         messaging_defer_callback(msg_ctx, rec, cb->fn,
889                                                  cb->private_data);
890                 } else {
891                         /*
892                          * This comes from a different process. we are called
893                          * from the event loop, so we should call back
894                          * directly.
895                          */
896                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
897                                rec->src, &rec->buf);
898                 }
899                 /*
900                  * we continue looking for matching messages after finding
901                  * one. This matters for subsystems like the internal notify
902                  * code which register more than one handler for the same
903                  * message type
904                  */
905         }
906
907         if (!messaging_append_new_waiters(msg_ctx)) {
908                 for (j=0; j < rec->num_fds; j++) {
909                         int fd = rec->fds[j];
910                         close(fd);
911                 }
912                 rec->num_fds = 0;
913                 rec->fds = NULL;
914                 return;
915         }
916
917         i = 0;
918         while (i < msg_ctx->num_waiters) {
919                 struct tevent_req *req;
920                 struct messaging_filtered_read_state *state;
921
922                 req = msg_ctx->waiters[i];
923                 if (req == NULL) {
924                         /*
925                          * This got cleaned up. In the meantime,
926                          * move everything down one. We need
927                          * to keep the order of waiters, as
928                          * other code may depend on this.
929                          */
930                         if (i < msg_ctx->num_waiters - 1) {
931                                 memmove(&msg_ctx->waiters[i],
932                                         &msg_ctx->waiters[i+1],
933                                         sizeof(struct tevent_req *) *
934                                             (msg_ctx->num_waiters - i - 1));
935                         }
936                         msg_ctx->num_waiters -= 1;
937                         continue;
938                 }
939
940                 state = tevent_req_data(
941                         req, struct messaging_filtered_read_state);
942                 if (state->filter(rec, state->private_data)) {
943                         messaging_filtered_read_done(req, rec);
944
945                         /*
946                          * Only the first one gets the fd-array
947                          */
948                         rec->num_fds = 0;
949                         rec->fds = NULL;
950                 }
951
952                 i += 1;
953         }
954
955         /*
956          * If the fd-array isn't used, just close it.
957          */
958         for (j=0; j < rec->num_fds; j++) {
959                 int fd = rec->fds[j];
960                 close(fd);
961         }
962         rec->num_fds = 0;
963         rec->fds = NULL;
964 }
965
966 static int mess_parent_dgm_cleanup(void *private_data);
967 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
968
969 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
970 {
971         struct tevent_req *req;
972
973         req = background_job_send(
974                 msg, msg->event_ctx, msg, NULL, 0,
975                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
976                             60*15),
977                 mess_parent_dgm_cleanup, msg);
978         if (req == NULL) {
979                 return false;
980         }
981         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
982         return true;
983 }
984
985 static int mess_parent_dgm_cleanup(void *private_data)
986 {
987         int ret;
988
989         ret = messaging_dgm_wipe();
990         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
991                    ret ? strerror(ret) : "ok"));
992         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
993                            60*15);
994 }
995
996 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
997 {
998         struct messaging_context *msg = tevent_req_callback_data(
999                 req, struct messaging_context);
1000         NTSTATUS status;
1001
1002         status = background_job_recv(req);
1003         TALLOC_FREE(req);
1004         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1005                   nt_errstr(status)));
1006
1007         req = background_job_send(
1008                 msg, msg->event_ctx, msg, NULL, 0,
1009                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1010                             60*15),
1011                 mess_parent_dgm_cleanup, msg);
1012         if (req == NULL) {
1013                 DEBUG(1, ("background_job_send failed\n"));
1014         }
1015         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1016 }
1017
1018 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1019 {
1020         int ret;
1021
1022         if (pid == 0) {
1023                 ret = messaging_dgm_wipe();
1024         } else {
1025                 ret = messaging_dgm_cleanup(pid);
1026         }
1027
1028         return ret;
1029 }
1030
1031 struct tevent_context *messaging_tevent_context(
1032         struct messaging_context *msg_ctx)
1033 {
1034         return msg_ctx->event_ctx;
1035 }
1036
1037 /** @} **/