27114687189ed8fcd86ba93c499cbf51913782f7
[obnox/samba/samba-obnox.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57
58 struct messaging_callback {
59         struct messaging_callback *prev, *next;
60         uint32 msg_type;
61         void (*fn)(struct messaging_context *msg, void *private_data, 
62                    uint32_t msg_type, 
63                    struct server_id server_id, DATA_BLOB *data);
64         void *private_data;
65 };
66
67 struct messaging_context {
68         struct server_id id;
69         struct tevent_context *event_ctx;
70         struct messaging_callback *callbacks;
71
72         struct tevent_req **new_waiters;
73         unsigned num_new_waiters;
74
75         struct tevent_req **waiters;
76         unsigned num_waiters;
77
78         struct messaging_backend *remote;
79
80         struct server_id_db *names_db;
81 };
82
83 struct messaging_hdr {
84         uint32_t msg_type;
85         struct server_id dst;
86         struct server_id src;
87 };
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32 msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         const struct messaging_hdr *hdr;
217         struct server_id_buf idbuf;
218         struct messaging_rec rec;
219         int64_t fds64[MIN(num_fds, INT8_MAX)];
220         size_t i;
221
222         if (msg_len < sizeof(*hdr)) {
223                 for (i=0; i < num_fds; i++) {
224                         close(fds[i]);
225                 }
226                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
227                 return;
228         }
229
230         if (num_fds > INT8_MAX) {
231                 for (i=0; i < num_fds; i++) {
232                         close(fds[i]);
233                 }
234                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
235                 return;
236         }
237
238         /*
239          * "consume" the fds by copying them and setting
240          * the original variable to -1
241          */
242         for (i=0; i < num_fds; i++) {
243                 fds64[i] = fds[i];
244                 fds[i] = -1;
245         }
246
247         /*
248          * messages_dgm guarantees alignment, so we can cast here
249          */
250         hdr = (const struct messaging_hdr *)msg;
251
252         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
253                    __func__, (unsigned)hdr->msg_type,
254                    (unsigned)(msg_len - sizeof(*hdr)),
255                    (unsigned)num_fds,
256                    server_id_str_buf(hdr->src, &idbuf)));
257
258         rec = (struct messaging_rec) {
259                 .msg_version = MESSAGE_VERSION,
260                 .msg_type = hdr->msg_type,
261                 .src = hdr->src,
262                 .dest = hdr->dst,
263                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
264                 .buf.length = msg_len - sizeof(*hdr),
265                 .num_fds = num_fds,
266                 .fds = fds64,
267         };
268
269         messaging_dispatch_rec(msg_ctx, &rec);
270 }
271
272 static int messaging_context_destructor(struct messaging_context *ctx)
273 {
274         unsigned i;
275
276         messaging_dgm_destroy();
277
278         for (i=0; i<ctx->num_new_waiters; i++) {
279                 if (ctx->new_waiters[i] != NULL) {
280                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
281                         ctx->new_waiters[i] = NULL;
282                 }
283         }
284         for (i=0; i<ctx->num_waiters; i++) {
285                 if (ctx->waiters[i] != NULL) {
286                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
287                         ctx->waiters[i] = NULL;
288                 }
289         }
290
291         return 0;
292 }
293
294 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
295                                          struct tevent_context *ev)
296 {
297         struct messaging_context *ctx;
298         NTSTATUS status;
299         int ret;
300
301         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
302                 return NULL;
303         }
304
305         ctx->id = procid_self();
306         ctx->event_ctx = ev;
307
308         sec_init();
309
310         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
311                                  lp_cache_directory(), sec_initial_uid(),
312                                  messaging_recv_cb, ctx);
313
314         if (ret != 0) {
315                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
316                 TALLOC_FREE(ctx);
317                 return NULL;
318         }
319
320         ctx->names_db = server_id_db_init(
321                 ctx, ctx->id, lp_cache_directory(), 0,
322                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
323         if (ctx->names_db == NULL) {
324                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
325                 TALLOC_FREE(ctx);
326                 return NULL;
327         }
328
329         talloc_set_destructor(ctx, messaging_context_destructor);
330
331         if (lp_clustering()) {
332                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
333
334                 if (!NT_STATUS_IS_OK(status)) {
335                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
336                                   nt_errstr(status)));
337                         TALLOC_FREE(ctx);
338                         return NULL;
339                 }
340         }
341         ctx->id.vnn = get_my_vnn();
342
343         messaging_register(ctx, NULL, MSG_PING, ping_message);
344
345         /* Register some debugging related messages */
346
347         register_msg_pool_usage(ctx);
348         register_dmalloc_msgs(ctx);
349         debug_register_msgs(ctx);
350
351         return ctx;
352 }
353
354 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
355 {
356         return msg_ctx->id;
357 }
358
359 /*
360  * re-init after a fork
361  */
362 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
363 {
364         NTSTATUS status;
365         int ret;
366
367         messaging_dgm_destroy();
368
369         msg_ctx->id = procid_self();
370
371         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
372                                  lp_cache_directory(), sec_initial_uid(),
373                                  messaging_recv_cb, msg_ctx);
374         if (ret != 0) {
375                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
376                 return map_nt_error_from_unix(ret);
377         }
378
379         TALLOC_FREE(msg_ctx->remote);
380
381         if (lp_clustering()) {
382                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
383                                               &msg_ctx->remote);
384
385                 if (!NT_STATUS_IS_OK(status)) {
386                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
387                                   nt_errstr(status)));
388                         return status;
389                 }
390         }
391
392         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
393
394         return NT_STATUS_OK;
395 }
396
397
398 /*
399  * Register a dispatch function for a particular message type. Allow multiple
400  * registrants
401 */
402 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
403                             void *private_data,
404                             uint32_t msg_type,
405                             void (*fn)(struct messaging_context *msg,
406                                        void *private_data, 
407                                        uint32_t msg_type, 
408                                        struct server_id server_id,
409                                        DATA_BLOB *data))
410 {
411         struct messaging_callback *cb;
412
413         DEBUG(5, ("Registering messaging pointer for type %u - "
414                   "private_data=%p\n",
415                   (unsigned)msg_type, private_data));
416
417         /*
418          * Only one callback per type
419          */
420
421         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
422                 /* we allow a second registration of the same message
423                    type if it has a different private pointer. This is
424                    needed in, for example, the internal notify code,
425                    which creates a new notify context for each tree
426                    connect, and expects to receive messages to each of
427                    them. */
428                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
429                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
430                                   (unsigned)msg_type, private_data));
431                         cb->fn = fn;
432                         cb->private_data = private_data;
433                         return NT_STATUS_OK;
434                 }
435         }
436
437         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
438                 return NT_STATUS_NO_MEMORY;
439         }
440
441         cb->msg_type = msg_type;
442         cb->fn = fn;
443         cb->private_data = private_data;
444
445         DLIST_ADD(msg_ctx->callbacks, cb);
446         return NT_STATUS_OK;
447 }
448
449 /*
450   De-register the function for a particular message type.
451 */
452 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
453                           void *private_data)
454 {
455         struct messaging_callback *cb, *next;
456
457         for (cb = ctx->callbacks; cb; cb = next) {
458                 next = cb->next;
459                 if ((cb->msg_type == msg_type)
460                     && (cb->private_data == private_data)) {
461                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
462                                   (unsigned)msg_type, private_data));
463                         DLIST_REMOVE(ctx->callbacks, cb);
464                         TALLOC_FREE(cb);
465                 }
466         }
467 }
468
469 /*
470   Send a message to a particular server
471 */
472 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
473                         struct server_id server, uint32_t msg_type,
474                         const DATA_BLOB *data)
475 {
476         struct iovec iov;
477
478         iov.iov_base = data->data;
479         iov.iov_len = data->length;
480
481         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
482 }
483
484 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
485                             struct server_id server, uint32_t msg_type,
486                             const uint8_t *buf, size_t len)
487 {
488         DATA_BLOB blob = data_blob_const(buf, len);
489         return messaging_send(msg_ctx, server, msg_type, &blob);
490 }
491
492 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
493                             struct server_id server, uint32_t msg_type,
494                             const struct iovec *iov, int iovlen,
495                             const int *fds, size_t num_fds)
496 {
497         int ret;
498         struct messaging_hdr hdr;
499         struct iovec iov2[iovlen+1];
500
501         if (server_id_is_disconnected(&server)) {
502                 return NT_STATUS_INVALID_PARAMETER_MIX;
503         }
504
505         if (num_fds > INT8_MAX) {
506                 return NT_STATUS_INVALID_PARAMETER_MIX;
507         }
508
509         if (!procid_is_local(&server)) {
510                 if (num_fds > 0) {
511                         return NT_STATUS_NOT_SUPPORTED;
512                 }
513
514                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
515                                                msg_type, iov, iovlen,
516                                                NULL, 0,
517                                                msg_ctx->remote);
518                 if (ret != 0) {
519                         return map_nt_error_from_unix(ret);
520                 }
521                 return NT_STATUS_OK;
522         }
523
524         ZERO_STRUCT(hdr);
525         hdr = (struct messaging_hdr) {
526                 .msg_type = msg_type,
527                 .dst = server,
528                 .src = msg_ctx->id
529         };
530         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
531         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
532
533         become_root();
534         ret = messaging_dgm_send(server.pid, iov2, iovlen+1, fds, num_fds);
535         unbecome_root();
536
537         if (ret != 0) {
538                 return map_nt_error_from_unix(ret);
539         }
540         return NT_STATUS_OK;
541 }
542
543 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
544                                                struct messaging_rec *rec)
545 {
546         struct messaging_rec *result;
547         size_t fds_size = sizeof(int64_t) * rec->num_fds;
548
549         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
550                                       rec->buf.length + fds_size);
551         if (result == NULL) {
552                 return NULL;
553         }
554         *result = *rec;
555
556         /* Doesn't fail, see talloc_pooled_object */
557
558         result->buf.data = talloc_memdup(result, rec->buf.data,
559                                          rec->buf.length);
560
561         result->fds = NULL;
562         if (result->num_fds > 0) {
563                 result->fds = talloc_array(result, int64_t, result->num_fds);
564                 memcpy(result->fds, rec->fds, fds_size);
565         }
566
567         return result;
568 }
569
570 struct messaging_filtered_read_state {
571         struct tevent_context *ev;
572         struct messaging_context *msg_ctx;
573         void *tevent_handle;
574
575         bool (*filter)(struct messaging_rec *rec, void *private_data);
576         void *private_data;
577
578         struct messaging_rec *rec;
579 };
580
581 static void messaging_filtered_read_cleanup(struct tevent_req *req,
582                                             enum tevent_req_state req_state);
583
584 struct tevent_req *messaging_filtered_read_send(
585         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
586         struct messaging_context *msg_ctx,
587         bool (*filter)(struct messaging_rec *rec, void *private_data),
588         void *private_data)
589 {
590         struct tevent_req *req;
591         struct messaging_filtered_read_state *state;
592         size_t new_waiters_len;
593
594         req = tevent_req_create(mem_ctx, &state,
595                                 struct messaging_filtered_read_state);
596         if (req == NULL) {
597                 return NULL;
598         }
599         state->ev = ev;
600         state->msg_ctx = msg_ctx;
601         state->filter = filter;
602         state->private_data = private_data;
603
604         /*
605          * We have to defer the callback here, as we might be called from
606          * within a different tevent_context than state->ev
607          */
608         tevent_req_defer_callback(req, state->ev);
609
610         state->tevent_handle = messaging_dgm_register_tevent_context(
611                 state, ev);
612         if (tevent_req_nomem(state, req)) {
613                 return tevent_req_post(req, ev);
614         }
615
616         /*
617          * We add ourselves to the "new_waiters" array, not the "waiters"
618          * array. If we are called from within messaging_read_done,
619          * messaging_dispatch_rec will be in an active for-loop on
620          * "waiters". We must be careful not to mess with this array, because
621          * it could mean that a single event is being delivered twice.
622          */
623
624         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
625
626         if (new_waiters_len == msg_ctx->num_new_waiters) {
627                 struct tevent_req **tmp;
628
629                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
630                                      struct tevent_req *, new_waiters_len+1);
631                 if (tevent_req_nomem(tmp, req)) {
632                         return tevent_req_post(req, ev);
633                 }
634                 msg_ctx->new_waiters = tmp;
635         }
636
637         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
638         msg_ctx->num_new_waiters += 1;
639         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
640
641         return req;
642 }
643
644 static void messaging_filtered_read_cleanup(struct tevent_req *req,
645                                             enum tevent_req_state req_state)
646 {
647         struct messaging_filtered_read_state *state = tevent_req_data(
648                 req, struct messaging_filtered_read_state);
649         struct messaging_context *msg_ctx = state->msg_ctx;
650         unsigned i;
651
652         tevent_req_set_cleanup_fn(req, NULL);
653
654         TALLOC_FREE(state->tevent_handle);
655
656         /*
657          * Just set the [new_]waiters entry to NULL, be careful not to mess
658          * with the other "waiters" array contents. We are often called from
659          * within "messaging_dispatch_rec", which loops over
660          * "waiters". Messing with the "waiters" array will mess up that
661          * for-loop.
662          */
663
664         for (i=0; i<msg_ctx->num_waiters; i++) {
665                 if (msg_ctx->waiters[i] == req) {
666                         msg_ctx->waiters[i] = NULL;
667                         return;
668                 }
669         }
670
671         for (i=0; i<msg_ctx->num_new_waiters; i++) {
672                 if (msg_ctx->new_waiters[i] == req) {
673                         msg_ctx->new_waiters[i] = NULL;
674                         return;
675                 }
676         }
677 }
678
679 static void messaging_filtered_read_done(struct tevent_req *req,
680                                          struct messaging_rec *rec)
681 {
682         struct messaging_filtered_read_state *state = tevent_req_data(
683                 req, struct messaging_filtered_read_state);
684
685         state->rec = messaging_rec_dup(state, rec);
686         if (tevent_req_nomem(state->rec, req)) {
687                 return;
688         }
689         tevent_req_done(req);
690 }
691
692 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
693                                  struct messaging_rec **presult)
694 {
695         struct messaging_filtered_read_state *state = tevent_req_data(
696                 req, struct messaging_filtered_read_state);
697         int err;
698
699         if (tevent_req_is_unix_error(req, &err)) {
700                 tevent_req_received(req);
701                 return err;
702         }
703         *presult = talloc_move(mem_ctx, &state->rec);
704         return 0;
705 }
706
707 struct messaging_read_state {
708         uint32_t msg_type;
709         struct messaging_rec *rec;
710 };
711
712 static bool messaging_read_filter(struct messaging_rec *rec,
713                                   void *private_data);
714 static void messaging_read_done(struct tevent_req *subreq);
715
716 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
717                                        struct tevent_context *ev,
718                                        struct messaging_context *msg,
719                                        uint32_t msg_type)
720 {
721         struct tevent_req *req, *subreq;
722         struct messaging_read_state *state;
723
724         req = tevent_req_create(mem_ctx, &state,
725                                 struct messaging_read_state);
726         if (req == NULL) {
727                 return NULL;
728         }
729         state->msg_type = msg_type;
730
731         subreq = messaging_filtered_read_send(state, ev, msg,
732                                               messaging_read_filter, state);
733         if (tevent_req_nomem(subreq, req)) {
734                 return tevent_req_post(req, ev);
735         }
736         tevent_req_set_callback(subreq, messaging_read_done, req);
737         return req;
738 }
739
740 static bool messaging_read_filter(struct messaging_rec *rec,
741                                   void *private_data)
742 {
743         struct messaging_read_state *state = talloc_get_type_abort(
744                 private_data, struct messaging_read_state);
745
746         if (rec->num_fds != 0) {
747                 return false;
748         }
749
750         return rec->msg_type == state->msg_type;
751 }
752
753 static void messaging_read_done(struct tevent_req *subreq)
754 {
755         struct tevent_req *req = tevent_req_callback_data(
756                 subreq, struct tevent_req);
757         struct messaging_read_state *state = tevent_req_data(
758                 req, struct messaging_read_state);
759         int ret;
760
761         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
762         TALLOC_FREE(subreq);
763         if (tevent_req_error(req, ret)) {
764                 return;
765         }
766         tevent_req_done(req);
767 }
768
769 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
770                         struct messaging_rec **presult)
771 {
772         struct messaging_read_state *state = tevent_req_data(
773                 req, struct messaging_read_state);
774         int err;
775
776         if (tevent_req_is_unix_error(req, &err)) {
777                 return err;
778         }
779         if (presult != NULL) {
780                 *presult = talloc_move(mem_ctx, &state->rec);
781         }
782         return 0;
783 }
784
785 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
786 {
787         if (msg_ctx->num_new_waiters == 0) {
788                 return true;
789         }
790
791         if (talloc_array_length(msg_ctx->waiters) <
792             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
793                 struct tevent_req **tmp;
794                 tmp = talloc_realloc(
795                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
796                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
797                 if (tmp == NULL) {
798                         DEBUG(1, ("%s: talloc failed\n", __func__));
799                         return false;
800                 }
801                 msg_ctx->waiters = tmp;
802         }
803
804         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
805                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
806
807         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
808         msg_ctx->num_new_waiters = 0;
809
810         return true;
811 }
812
813 /*
814   Dispatch one messaging_rec
815 */
816 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
817                             struct messaging_rec *rec)
818 {
819         struct messaging_callback *cb, *next;
820         unsigned i;
821         size_t j;
822
823         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
824                 next = cb->next;
825                 if (cb->msg_type != rec->msg_type) {
826                         continue;
827                 }
828
829                 /*
830                  * the old style callbacks don't support fd passing
831                  */
832                 for (j=0; j < rec->num_fds; j++) {
833                         int fd = rec->fds[j];
834                         close(fd);
835                 }
836                 rec->num_fds = 0;
837                 rec->fds = NULL;
838
839                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
840                        rec->src, &rec->buf);
841
842                 /*
843                  * we continue looking for matching messages after finding
844                  * one. This matters for subsystems like the internal notify
845                  * code which register more than one handler for the same
846                  * message type
847                  */
848         }
849
850         if (!messaging_append_new_waiters(msg_ctx)) {
851                 for (j=0; j < rec->num_fds; j++) {
852                         int fd = rec->fds[j];
853                         close(fd);
854                 }
855                 rec->num_fds = 0;
856                 rec->fds = NULL;
857                 return;
858         }
859
860         i = 0;
861         while (i < msg_ctx->num_waiters) {
862                 struct tevent_req *req;
863                 struct messaging_filtered_read_state *state;
864
865                 req = msg_ctx->waiters[i];
866                 if (req == NULL) {
867                         /*
868                          * This got cleaned up. In the meantime,
869                          * move everything down one. We need
870                          * to keep the order of waiters, as
871                          * other code may depend on this.
872                          */
873                         if (i < msg_ctx->num_waiters - 1) {
874                                 memmove(&msg_ctx->waiters[i],
875                                         &msg_ctx->waiters[i+1],
876                                         sizeof(struct tevent_req *) *
877                                             (msg_ctx->num_waiters - i - 1));
878                         }
879                         msg_ctx->num_waiters -= 1;
880                         continue;
881                 }
882
883                 state = tevent_req_data(
884                         req, struct messaging_filtered_read_state);
885                 if (state->filter(rec, state->private_data)) {
886                         messaging_filtered_read_done(req, rec);
887
888                         /*
889                          * Only the first one gets the fd-array
890                          */
891                         rec->num_fds = 0;
892                         rec->fds = NULL;
893                 }
894
895                 i += 1;
896         }
897
898         /*
899          * If the fd-array isn't used, just close it.
900          */
901         for (j=0; j < rec->num_fds; j++) {
902                 int fd = rec->fds[j];
903                 close(fd);
904         }
905         rec->num_fds = 0;
906         rec->fds = NULL;
907 }
908
909 static int mess_parent_dgm_cleanup(void *private_data);
910 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
911
912 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
913 {
914         struct tevent_req *req;
915
916         req = background_job_send(
917                 msg, msg->event_ctx, msg, NULL, 0,
918                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
919                             60*15),
920                 mess_parent_dgm_cleanup, msg);
921         if (req == NULL) {
922                 return false;
923         }
924         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
925         return true;
926 }
927
928 static int mess_parent_dgm_cleanup(void *private_data)
929 {
930         int ret;
931
932         ret = messaging_dgm_wipe();
933         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
934                    ret ? strerror(ret) : "ok"));
935         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
936                            60*15);
937 }
938
939 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
940 {
941         struct messaging_context *msg = tevent_req_callback_data(
942                 req, struct messaging_context);
943         NTSTATUS status;
944
945         status = background_job_recv(req);
946         TALLOC_FREE(req);
947         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
948                   nt_errstr(status)));
949
950         req = background_job_send(
951                 msg, msg->event_ctx, msg, NULL, 0,
952                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
953                             60*15),
954                 mess_parent_dgm_cleanup, msg);
955         if (req == NULL) {
956                 DEBUG(1, ("background_job_send failed\n"));
957         }
958         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
959 }
960
961 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
962 {
963         int ret;
964
965         if (pid == 0) {
966                 ret = messaging_dgm_wipe();
967         } else {
968                 ret = messaging_dgm_cleanup(pid);
969         }
970
971         return ret;
972 }
973
974 struct tevent_context *messaging_tevent_context(
975         struct messaging_context *msg_ctx)
976 {
977         return msg_ctx->event_ctx;
978 }
979
980 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
981 {
982         return msg_ctx->names_db;
983 }
984
985 /** @} **/