]> git.samba.org - obnox/samba/samba-obnox.git/blob - source3/lib/messages.c
messaging3: Avoid self-send complexity
[obnox/samba/samba-obnox.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56
57 struct messaging_callback {
58         struct messaging_callback *prev, *next;
59         uint32 msg_type;
60         void (*fn)(struct messaging_context *msg, void *private_data, 
61                    uint32_t msg_type, 
62                    struct server_id server_id, DATA_BLOB *data);
63         void *private_data;
64 };
65
66 struct messaging_context {
67         struct server_id id;
68         struct tevent_context *event_ctx;
69         struct messaging_callback *callbacks;
70
71         struct tevent_req **new_waiters;
72         unsigned num_new_waiters;
73
74         struct tevent_req **waiters;
75         unsigned num_waiters;
76
77         struct messaging_backend *remote;
78 };
79
80 struct messaging_hdr {
81         uint32_t msg_type;
82         struct server_id dst;
83         struct server_id src;
84 };
85
86 /****************************************************************************
87  A useful function for testing the message system.
88 ****************************************************************************/
89
90 static void ping_message(struct messaging_context *msg_ctx,
91                          void *private_data,
92                          uint32_t msg_type,
93                          struct server_id src,
94                          DATA_BLOB *data)
95 {
96         struct server_id_buf idbuf;
97
98         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
99                   server_id_str_buf(src, &idbuf), (int)data->length,
100                   data->data ? (char *)data->data : ""));
101
102         messaging_send(msg_ctx, src, MSG_PONG, data);
103 }
104
105 /****************************************************************************
106  Register/replace a dispatch function for a particular message type.
107  JRA changed Dec 13 2006. Only one message handler now permitted per type.
108  *NOTE*: Dispatch functions must be able to cope with incoming
109  messages on an *odd* byte boundary.
110 ****************************************************************************/
111
112 struct msg_all {
113         struct messaging_context *msg_ctx;
114         int msg_type;
115         uint32 msg_flag;
116         const void *buf;
117         size_t len;
118         int n_sent;
119 };
120
121 /****************************************************************************
122  Send one of the messages for the broadcast.
123 ****************************************************************************/
124
125 static int traverse_fn(struct db_record *rec, const struct server_id *id,
126                        uint32_t msg_flags, void *state)
127 {
128         struct msg_all *msg_all = (struct msg_all *)state;
129         NTSTATUS status;
130
131         /* Don't send if the receiver hasn't registered an interest. */
132
133         if((msg_flags & msg_all->msg_flag) == 0) {
134                 return 0;
135         }
136
137         /* If the msg send fails because the pid was not found (i.e. smbd died), 
138          * the msg has already been deleted from the messages.tdb.*/
139
140         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
141                                     (const uint8_t *)msg_all->buf, msg_all->len);
142
143         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
144                 struct server_id_buf idbuf;
145
146                 /*
147                  * If the pid was not found delete the entry from
148                  * serverid.tdb
149                  */
150
151                 DEBUG(2, ("pid %s doesn't exist\n",
152                           server_id_str_buf(*id, &idbuf)));
153
154                 dbwrap_record_delete(rec);
155         }
156         msg_all->n_sent++;
157         return 0;
158 }
159
160 /**
161  * Send a message to all smbd processes.
162  *
163  * It isn't very efficient, but should be OK for the sorts of
164  * applications that use it. When we need efficient broadcast we can add
165  * it.
166  *
167  * @param n_sent Set to the number of messages sent.  This should be
168  * equal to the number of processes, but be careful for races.
169  *
170  * @retval True for success.
171  **/
172 bool message_send_all(struct messaging_context *msg_ctx,
173                       int msg_type,
174                       const void *buf, size_t len,
175                       int *n_sent)
176 {
177         struct msg_all msg_all;
178
179         msg_all.msg_type = msg_type;
180         if (msg_type < 0x100) {
181                 msg_all.msg_flag = FLAG_MSG_GENERAL;
182         } else if (msg_type > 0x100 && msg_type < 0x200) {
183                 msg_all.msg_flag = FLAG_MSG_NMBD;
184         } else if (msg_type > 0x200 && msg_type < 0x300) {
185                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
186         } else if (msg_type > 0x300 && msg_type < 0x400) {
187                 msg_all.msg_flag = FLAG_MSG_SMBD;
188         } else if (msg_type > 0x400 && msg_type < 0x600) {
189                 msg_all.msg_flag = FLAG_MSG_WINBIND;
190         } else if (msg_type > 4000 && msg_type < 5000) {
191                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
192         } else {
193                 return false;
194         }
195
196         msg_all.buf = buf;
197         msg_all.len = len;
198         msg_all.n_sent = 0;
199         msg_all.msg_ctx = msg_ctx;
200
201         serverid_traverse(traverse_fn, &msg_all);
202         if (n_sent)
203                 *n_sent = msg_all.n_sent;
204         return true;
205 }
206
207 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
208                               int *fds, size_t num_fds,
209                               void *private_data)
210 {
211         struct messaging_context *msg_ctx = talloc_get_type_abort(
212                 private_data, struct messaging_context);
213         const struct messaging_hdr *hdr;
214         struct server_id_buf idbuf;
215         struct messaging_rec rec;
216         int64_t fds64[MIN(num_fds, INT8_MAX)];
217         size_t i;
218
219         if (msg_len < sizeof(*hdr)) {
220                 for (i=0; i < num_fds; i++) {
221                         close(fds[i]);
222                 }
223                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
224                 return;
225         }
226
227         if (num_fds > INT8_MAX) {
228                 for (i=0; i < num_fds; i++) {
229                         close(fds[i]);
230                 }
231                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
232                 return;
233         }
234
235         /*
236          * "consume" the fds by copying them and setting
237          * the original variable to -1
238          */
239         for (i=0; i < num_fds; i++) {
240                 fds64[i] = fds[i];
241                 fds[i] = -1;
242         }
243
244         /*
245          * messages_dgm guarantees alignment, so we can cast here
246          */
247         hdr = (const struct messaging_hdr *)msg;
248
249         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
250                    __func__, (unsigned)hdr->msg_type,
251                    (unsigned)(msg_len - sizeof(*hdr)),
252                    (unsigned)num_fds,
253                    server_id_str_buf(hdr->src, &idbuf)));
254
255         rec = (struct messaging_rec) {
256                 .msg_version = MESSAGE_VERSION,
257                 .msg_type = hdr->msg_type,
258                 .src = hdr->src,
259                 .dest = hdr->dst,
260                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
261                 .buf.length = msg_len - sizeof(*hdr),
262                 .num_fds = num_fds,
263                 .fds = fds64,
264         };
265
266         messaging_dispatch_rec(msg_ctx, &rec);
267 }
268
269 static int messaging_context_destructor(struct messaging_context *ctx)
270 {
271         unsigned i;
272
273         messaging_dgm_destroy();
274
275         for (i=0; i<ctx->num_new_waiters; i++) {
276                 if (ctx->new_waiters[i] != NULL) {
277                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
278                         ctx->new_waiters[i] = NULL;
279                 }
280         }
281         for (i=0; i<ctx->num_waiters; i++) {
282                 if (ctx->waiters[i] != NULL) {
283                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
284                         ctx->waiters[i] = NULL;
285                 }
286         }
287
288         return 0;
289 }
290
291 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
292                                          struct tevent_context *ev)
293 {
294         struct messaging_context *ctx;
295         NTSTATUS status;
296         int ret;
297
298         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
299                 return NULL;
300         }
301
302         ctx->id = procid_self();
303         ctx->event_ctx = ev;
304
305         sec_init();
306
307         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
308                                  lp_cache_directory(), sec_initial_uid(),
309                                  messaging_recv_cb, ctx);
310
311         if (ret != 0) {
312                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
313                 TALLOC_FREE(ctx);
314                 return NULL;
315         }
316
317         talloc_set_destructor(ctx, messaging_context_destructor);
318
319         if (lp_clustering()) {
320                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
321
322                 if (!NT_STATUS_IS_OK(status)) {
323                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
324                                   nt_errstr(status)));
325                         TALLOC_FREE(ctx);
326                         return NULL;
327                 }
328         }
329         ctx->id.vnn = get_my_vnn();
330
331         messaging_register(ctx, NULL, MSG_PING, ping_message);
332
333         /* Register some debugging related messages */
334
335         register_msg_pool_usage(ctx);
336         register_dmalloc_msgs(ctx);
337         debug_register_msgs(ctx);
338
339         return ctx;
340 }
341
342 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
343 {
344         return msg_ctx->id;
345 }
346
347 /*
348  * re-init after a fork
349  */
350 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
351 {
352         NTSTATUS status;
353         int ret;
354
355         messaging_dgm_destroy();
356
357         msg_ctx->id = procid_self();
358
359         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
360                                  lp_cache_directory(), sec_initial_uid(),
361                                  messaging_recv_cb, msg_ctx);
362         if (ret != 0) {
363                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
364                 return map_nt_error_from_unix(ret);
365         }
366
367         TALLOC_FREE(msg_ctx->remote);
368
369         if (lp_clustering()) {
370                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
371                                               &msg_ctx->remote);
372
373                 if (!NT_STATUS_IS_OK(status)) {
374                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
375                                   nt_errstr(status)));
376                         return status;
377                 }
378         }
379
380         return NT_STATUS_OK;
381 }
382
383
384 /*
385  * Register a dispatch function for a particular message type. Allow multiple
386  * registrants
387 */
388 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
389                             void *private_data,
390                             uint32_t msg_type,
391                             void (*fn)(struct messaging_context *msg,
392                                        void *private_data, 
393                                        uint32_t msg_type, 
394                                        struct server_id server_id,
395                                        DATA_BLOB *data))
396 {
397         struct messaging_callback *cb;
398
399         DEBUG(5, ("Registering messaging pointer for type %u - "
400                   "private_data=%p\n",
401                   (unsigned)msg_type, private_data));
402
403         /*
404          * Only one callback per type
405          */
406
407         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
408                 /* we allow a second registration of the same message
409                    type if it has a different private pointer. This is
410                    needed in, for example, the internal notify code,
411                    which creates a new notify context for each tree
412                    connect, and expects to receive messages to each of
413                    them. */
414                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
415                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
416                                   (unsigned)msg_type, private_data));
417                         cb->fn = fn;
418                         cb->private_data = private_data;
419                         return NT_STATUS_OK;
420                 }
421         }
422
423         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
424                 return NT_STATUS_NO_MEMORY;
425         }
426
427         cb->msg_type = msg_type;
428         cb->fn = fn;
429         cb->private_data = private_data;
430
431         DLIST_ADD(msg_ctx->callbacks, cb);
432         return NT_STATUS_OK;
433 }
434
435 /*
436   De-register the function for a particular message type.
437 */
438 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
439                           void *private_data)
440 {
441         struct messaging_callback *cb, *next;
442
443         for (cb = ctx->callbacks; cb; cb = next) {
444                 next = cb->next;
445                 if ((cb->msg_type == msg_type)
446                     && (cb->private_data == private_data)) {
447                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
448                                   (unsigned)msg_type, private_data));
449                         DLIST_REMOVE(ctx->callbacks, cb);
450                         TALLOC_FREE(cb);
451                 }
452         }
453 }
454
455 /*
456   Send a message to a particular server
457 */
458 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
459                         struct server_id server, uint32_t msg_type,
460                         const DATA_BLOB *data)
461 {
462         struct iovec iov;
463
464         iov.iov_base = data->data;
465         iov.iov_len = data->length;
466
467         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
468 }
469
470 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
471                             struct server_id server, uint32_t msg_type,
472                             const uint8_t *buf, size_t len)
473 {
474         DATA_BLOB blob = data_blob_const(buf, len);
475         return messaging_send(msg_ctx, server, msg_type, &blob);
476 }
477
478 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
479                             struct server_id server, uint32_t msg_type,
480                             const struct iovec *iov, int iovlen,
481                             const int *fds, size_t num_fds)
482 {
483         int ret;
484         struct messaging_hdr hdr;
485         struct iovec iov2[iovlen+1];
486
487         if (server_id_is_disconnected(&server)) {
488                 return NT_STATUS_INVALID_PARAMETER_MIX;
489         }
490
491         if (num_fds > INT8_MAX) {
492                 return NT_STATUS_INVALID_PARAMETER_MIX;
493         }
494
495         if (!procid_is_local(&server)) {
496                 if (num_fds > 0) {
497                         return NT_STATUS_NOT_SUPPORTED;
498                 }
499
500                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
501                                                msg_type, iov, iovlen,
502                                                NULL, 0,
503                                                msg_ctx->remote);
504                 if (ret != 0) {
505                         return map_nt_error_from_unix(ret);
506                 }
507                 return NT_STATUS_OK;
508         }
509
510         ZERO_STRUCT(hdr);
511         hdr = (struct messaging_hdr) {
512                 .msg_type = msg_type,
513                 .dst = server,
514                 .src = msg_ctx->id
515         };
516         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
517         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
518
519         become_root();
520         ret = messaging_dgm_send(server.pid, iov2, iovlen+1, fds, num_fds);
521         unbecome_root();
522
523         if (ret != 0) {
524                 return map_nt_error_from_unix(ret);
525         }
526         return NT_STATUS_OK;
527 }
528
529 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
530                                                struct messaging_rec *rec)
531 {
532         struct messaging_rec *result;
533         size_t fds_size = sizeof(int64_t) * rec->num_fds;
534
535         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
536                                       rec->buf.length + fds_size);
537         if (result == NULL) {
538                 return NULL;
539         }
540         *result = *rec;
541
542         /* Doesn't fail, see talloc_pooled_object */
543
544         result->buf.data = talloc_memdup(result, rec->buf.data,
545                                          rec->buf.length);
546
547         result->fds = NULL;
548         if (result->num_fds > 0) {
549                 result->fds = talloc_array(result, int64_t, result->num_fds);
550                 memcpy(result->fds, rec->fds, fds_size);
551         }
552
553         return result;
554 }
555
556 struct messaging_filtered_read_state {
557         struct tevent_context *ev;
558         struct messaging_context *msg_ctx;
559         void *tevent_handle;
560
561         bool (*filter)(struct messaging_rec *rec, void *private_data);
562         void *private_data;
563
564         struct messaging_rec *rec;
565 };
566
567 static void messaging_filtered_read_cleanup(struct tevent_req *req,
568                                             enum tevent_req_state req_state);
569
570 struct tevent_req *messaging_filtered_read_send(
571         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
572         struct messaging_context *msg_ctx,
573         bool (*filter)(struct messaging_rec *rec, void *private_data),
574         void *private_data)
575 {
576         struct tevent_req *req;
577         struct messaging_filtered_read_state *state;
578         size_t new_waiters_len;
579
580         req = tevent_req_create(mem_ctx, &state,
581                                 struct messaging_filtered_read_state);
582         if (req == NULL) {
583                 return NULL;
584         }
585         state->ev = ev;
586         state->msg_ctx = msg_ctx;
587         state->filter = filter;
588         state->private_data = private_data;
589
590         /*
591          * We have to defer the callback here, as we might be called from
592          * within a different tevent_context than state->ev
593          */
594         tevent_req_defer_callback(req, state->ev);
595
596         state->tevent_handle = messaging_dgm_register_tevent_context(
597                 state, ev);
598         if (tevent_req_nomem(state, req)) {
599                 return tevent_req_post(req, ev);
600         }
601
602         /*
603          * We add ourselves to the "new_waiters" array, not the "waiters"
604          * array. If we are called from within messaging_read_done,
605          * messaging_dispatch_rec will be in an active for-loop on
606          * "waiters". We must be careful not to mess with this array, because
607          * it could mean that a single event is being delivered twice.
608          */
609
610         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
611
612         if (new_waiters_len == msg_ctx->num_new_waiters) {
613                 struct tevent_req **tmp;
614
615                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
616                                      struct tevent_req *, new_waiters_len+1);
617                 if (tevent_req_nomem(tmp, req)) {
618                         return tevent_req_post(req, ev);
619                 }
620                 msg_ctx->new_waiters = tmp;
621         }
622
623         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
624         msg_ctx->num_new_waiters += 1;
625         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
626
627         return req;
628 }
629
630 static void messaging_filtered_read_cleanup(struct tevent_req *req,
631                                             enum tevent_req_state req_state)
632 {
633         struct messaging_filtered_read_state *state = tevent_req_data(
634                 req, struct messaging_filtered_read_state);
635         struct messaging_context *msg_ctx = state->msg_ctx;
636         unsigned i;
637
638         tevent_req_set_cleanup_fn(req, NULL);
639
640         TALLOC_FREE(state->tevent_handle);
641
642         /*
643          * Just set the [new_]waiters entry to NULL, be careful not to mess
644          * with the other "waiters" array contents. We are often called from
645          * within "messaging_dispatch_rec", which loops over
646          * "waiters". Messing with the "waiters" array will mess up that
647          * for-loop.
648          */
649
650         for (i=0; i<msg_ctx->num_waiters; i++) {
651                 if (msg_ctx->waiters[i] == req) {
652                         msg_ctx->waiters[i] = NULL;
653                         return;
654                 }
655         }
656
657         for (i=0; i<msg_ctx->num_new_waiters; i++) {
658                 if (msg_ctx->new_waiters[i] == req) {
659                         msg_ctx->new_waiters[i] = NULL;
660                         return;
661                 }
662         }
663 }
664
665 static void messaging_filtered_read_done(struct tevent_req *req,
666                                          struct messaging_rec *rec)
667 {
668         struct messaging_filtered_read_state *state = tevent_req_data(
669                 req, struct messaging_filtered_read_state);
670
671         state->rec = messaging_rec_dup(state, rec);
672         if (tevent_req_nomem(state->rec, req)) {
673                 return;
674         }
675         tevent_req_done(req);
676 }
677
678 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
679                                  struct messaging_rec **presult)
680 {
681         struct messaging_filtered_read_state *state = tevent_req_data(
682                 req, struct messaging_filtered_read_state);
683         int err;
684
685         if (tevent_req_is_unix_error(req, &err)) {
686                 tevent_req_received(req);
687                 return err;
688         }
689         *presult = talloc_move(mem_ctx, &state->rec);
690         return 0;
691 }
692
693 struct messaging_read_state {
694         uint32_t msg_type;
695         struct messaging_rec *rec;
696 };
697
698 static bool messaging_read_filter(struct messaging_rec *rec,
699                                   void *private_data);
700 static void messaging_read_done(struct tevent_req *subreq);
701
702 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
703                                        struct tevent_context *ev,
704                                        struct messaging_context *msg,
705                                        uint32_t msg_type)
706 {
707         struct tevent_req *req, *subreq;
708         struct messaging_read_state *state;
709
710         req = tevent_req_create(mem_ctx, &state,
711                                 struct messaging_read_state);
712         if (req == NULL) {
713                 return NULL;
714         }
715         state->msg_type = msg_type;
716
717         subreq = messaging_filtered_read_send(state, ev, msg,
718                                               messaging_read_filter, state);
719         if (tevent_req_nomem(subreq, req)) {
720                 return tevent_req_post(req, ev);
721         }
722         tevent_req_set_callback(subreq, messaging_read_done, req);
723         return req;
724 }
725
726 static bool messaging_read_filter(struct messaging_rec *rec,
727                                   void *private_data)
728 {
729         struct messaging_read_state *state = talloc_get_type_abort(
730                 private_data, struct messaging_read_state);
731
732         if (rec->num_fds != 0) {
733                 return false;
734         }
735
736         return rec->msg_type == state->msg_type;
737 }
738
739 static void messaging_read_done(struct tevent_req *subreq)
740 {
741         struct tevent_req *req = tevent_req_callback_data(
742                 subreq, struct tevent_req);
743         struct messaging_read_state *state = tevent_req_data(
744                 req, struct messaging_read_state);
745         int ret;
746
747         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
748         TALLOC_FREE(subreq);
749         if (tevent_req_error(req, ret)) {
750                 return;
751         }
752         tevent_req_done(req);
753 }
754
755 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
756                         struct messaging_rec **presult)
757 {
758         struct messaging_read_state *state = tevent_req_data(
759                 req, struct messaging_read_state);
760         int err;
761
762         if (tevent_req_is_unix_error(req, &err)) {
763                 return err;
764         }
765         if (presult != NULL) {
766                 *presult = talloc_move(mem_ctx, &state->rec);
767         }
768         return 0;
769 }
770
771 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
772 {
773         if (msg_ctx->num_new_waiters == 0) {
774                 return true;
775         }
776
777         if (talloc_array_length(msg_ctx->waiters) <
778             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
779                 struct tevent_req **tmp;
780                 tmp = talloc_realloc(
781                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
782                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
783                 if (tmp == NULL) {
784                         DEBUG(1, ("%s: talloc failed\n", __func__));
785                         return false;
786                 }
787                 msg_ctx->waiters = tmp;
788         }
789
790         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
791                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
792
793         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
794         msg_ctx->num_new_waiters = 0;
795
796         return true;
797 }
798
799 /*
800   Dispatch one messaging_rec
801 */
802 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
803                             struct messaging_rec *rec)
804 {
805         struct messaging_callback *cb, *next;
806         unsigned i;
807         size_t j;
808
809         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
810                 next = cb->next;
811                 if (cb->msg_type != rec->msg_type) {
812                         continue;
813                 }
814
815                 /*
816                  * the old style callbacks don't support fd passing
817                  */
818                 for (j=0; j < rec->num_fds; j++) {
819                         int fd = rec->fds[j];
820                         close(fd);
821                 }
822                 rec->num_fds = 0;
823                 rec->fds = NULL;
824
825                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
826                        rec->src, &rec->buf);
827
828                 /*
829                  * we continue looking for matching messages after finding
830                  * one. This matters for subsystems like the internal notify
831                  * code which register more than one handler for the same
832                  * message type
833                  */
834         }
835
836         if (!messaging_append_new_waiters(msg_ctx)) {
837                 for (j=0; j < rec->num_fds; j++) {
838                         int fd = rec->fds[j];
839                         close(fd);
840                 }
841                 rec->num_fds = 0;
842                 rec->fds = NULL;
843                 return;
844         }
845
846         i = 0;
847         while (i < msg_ctx->num_waiters) {
848                 struct tevent_req *req;
849                 struct messaging_filtered_read_state *state;
850
851                 req = msg_ctx->waiters[i];
852                 if (req == NULL) {
853                         /*
854                          * This got cleaned up. In the meantime,
855                          * move everything down one. We need
856                          * to keep the order of waiters, as
857                          * other code may depend on this.
858                          */
859                         if (i < msg_ctx->num_waiters - 1) {
860                                 memmove(&msg_ctx->waiters[i],
861                                         &msg_ctx->waiters[i+1],
862                                         sizeof(struct tevent_req *) *
863                                             (msg_ctx->num_waiters - i - 1));
864                         }
865                         msg_ctx->num_waiters -= 1;
866                         continue;
867                 }
868
869                 state = tevent_req_data(
870                         req, struct messaging_filtered_read_state);
871                 if (state->filter(rec, state->private_data)) {
872                         messaging_filtered_read_done(req, rec);
873
874                         /*
875                          * Only the first one gets the fd-array
876                          */
877                         rec->num_fds = 0;
878                         rec->fds = NULL;
879                 }
880
881                 i += 1;
882         }
883
884         /*
885          * If the fd-array isn't used, just close it.
886          */
887         for (j=0; j < rec->num_fds; j++) {
888                 int fd = rec->fds[j];
889                 close(fd);
890         }
891         rec->num_fds = 0;
892         rec->fds = NULL;
893 }
894
895 static int mess_parent_dgm_cleanup(void *private_data);
896 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
897
898 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
899 {
900         struct tevent_req *req;
901
902         req = background_job_send(
903                 msg, msg->event_ctx, msg, NULL, 0,
904                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
905                             60*15),
906                 mess_parent_dgm_cleanup, msg);
907         if (req == NULL) {
908                 return false;
909         }
910         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
911         return true;
912 }
913
914 static int mess_parent_dgm_cleanup(void *private_data)
915 {
916         int ret;
917
918         ret = messaging_dgm_wipe();
919         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
920                    ret ? strerror(ret) : "ok"));
921         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
922                            60*15);
923 }
924
925 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
926 {
927         struct messaging_context *msg = tevent_req_callback_data(
928                 req, struct messaging_context);
929         NTSTATUS status;
930
931         status = background_job_recv(req);
932         TALLOC_FREE(req);
933         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
934                   nt_errstr(status)));
935
936         req = background_job_send(
937                 msg, msg->event_ctx, msg, NULL, 0,
938                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
939                             60*15),
940                 mess_parent_dgm_cleanup, msg);
941         if (req == NULL) {
942                 DEBUG(1, ("background_job_send failed\n"));
943         }
944         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
945 }
946
947 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
948 {
949         int ret;
950
951         if (pid == 0) {
952                 ret = messaging_dgm_wipe();
953         } else {
954                 ret = messaging_dgm_cleanup(pid);
955         }
956
957         return ret;
958 }
959
960 struct tevent_context *messaging_tevent_context(
961         struct messaging_context *msg_ctx)
962 {
963         return msg_ctx->event_ctx;
964 }
965
966 /** @} **/