messaging3: Add messaging_handler_send/recv
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57
58 struct messaging_callback {
59         struct messaging_callback *prev, *next;
60         uint32 msg_type;
61         void (*fn)(struct messaging_context *msg, void *private_data, 
62                    uint32_t msg_type, 
63                    struct server_id server_id, DATA_BLOB *data);
64         void *private_data;
65 };
66
67 struct messaging_context {
68         struct server_id id;
69         struct tevent_context *event_ctx;
70         struct messaging_callback *callbacks;
71
72         struct tevent_req **new_waiters;
73         unsigned num_new_waiters;
74
75         struct tevent_req **waiters;
76         unsigned num_waiters;
77
78         struct messaging_backend *remote;
79
80         struct server_id_db *names_db;
81 };
82
83 struct messaging_hdr {
84         uint32_t msg_type;
85         struct server_id dst;
86         struct server_id src;
87 };
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 /****************************************************************************
109  Register/replace a dispatch function for a particular message type.
110  JRA changed Dec 13 2006. Only one message handler now permitted per type.
111  *NOTE*: Dispatch functions must be able to cope with incoming
112  messages on an *odd* byte boundary.
113 ****************************************************************************/
114
115 struct msg_all {
116         struct messaging_context *msg_ctx;
117         int msg_type;
118         uint32 msg_flag;
119         const void *buf;
120         size_t len;
121         int n_sent;
122 };
123
124 /****************************************************************************
125  Send one of the messages for the broadcast.
126 ****************************************************************************/
127
128 static int traverse_fn(struct db_record *rec, const struct server_id *id,
129                        uint32_t msg_flags, void *state)
130 {
131         struct msg_all *msg_all = (struct msg_all *)state;
132         NTSTATUS status;
133
134         /* Don't send if the receiver hasn't registered an interest. */
135
136         if((msg_flags & msg_all->msg_flag) == 0) {
137                 return 0;
138         }
139
140         /* If the msg send fails because the pid was not found (i.e. smbd died), 
141          * the msg has already been deleted from the messages.tdb.*/
142
143         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
144                                     (const uint8_t *)msg_all->buf, msg_all->len);
145
146         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
147                 struct server_id_buf idbuf;
148
149                 /*
150                  * If the pid was not found delete the entry from
151                  * serverid.tdb
152                  */
153
154                 DEBUG(2, ("pid %s doesn't exist\n",
155                           server_id_str_buf(*id, &idbuf)));
156
157                 dbwrap_record_delete(rec);
158         }
159         msg_all->n_sent++;
160         return 0;
161 }
162
163 /**
164  * Send a message to all smbd processes.
165  *
166  * It isn't very efficient, but should be OK for the sorts of
167  * applications that use it. When we need efficient broadcast we can add
168  * it.
169  *
170  * @param n_sent Set to the number of messages sent.  This should be
171  * equal to the number of processes, but be careful for races.
172  *
173  * @retval True for success.
174  **/
175 bool message_send_all(struct messaging_context *msg_ctx,
176                       int msg_type,
177                       const void *buf, size_t len,
178                       int *n_sent)
179 {
180         struct msg_all msg_all;
181
182         msg_all.msg_type = msg_type;
183         if (msg_type < 0x100) {
184                 msg_all.msg_flag = FLAG_MSG_GENERAL;
185         } else if (msg_type > 0x100 && msg_type < 0x200) {
186                 msg_all.msg_flag = FLAG_MSG_NMBD;
187         } else if (msg_type > 0x200 && msg_type < 0x300) {
188                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
189         } else if (msg_type > 0x300 && msg_type < 0x400) {
190                 msg_all.msg_flag = FLAG_MSG_SMBD;
191         } else if (msg_type > 0x400 && msg_type < 0x600) {
192                 msg_all.msg_flag = FLAG_MSG_WINBIND;
193         } else if (msg_type > 4000 && msg_type < 5000) {
194                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
195         } else {
196                 return false;
197         }
198
199         msg_all.buf = buf;
200         msg_all.len = len;
201         msg_all.n_sent = 0;
202         msg_all.msg_ctx = msg_ctx;
203
204         serverid_traverse(traverse_fn, &msg_all);
205         if (n_sent)
206                 *n_sent = msg_all.n_sent;
207         return true;
208 }
209
210 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
211                               int *fds, size_t num_fds,
212                               void *private_data)
213 {
214         struct messaging_context *msg_ctx = talloc_get_type_abort(
215                 private_data, struct messaging_context);
216         const struct messaging_hdr *hdr;
217         struct server_id_buf idbuf;
218         struct messaging_rec rec;
219         int64_t fds64[MIN(num_fds, INT8_MAX)];
220         size_t i;
221
222         if (msg_len < sizeof(*hdr)) {
223                 for (i=0; i < num_fds; i++) {
224                         close(fds[i]);
225                 }
226                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
227                 return;
228         }
229
230         if (num_fds > INT8_MAX) {
231                 for (i=0; i < num_fds; i++) {
232                         close(fds[i]);
233                 }
234                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
235                 return;
236         }
237
238         /*
239          * "consume" the fds by copying them and setting
240          * the original variable to -1
241          */
242         for (i=0; i < num_fds; i++) {
243                 fds64[i] = fds[i];
244                 fds[i] = -1;
245         }
246
247         /*
248          * messages_dgm guarantees alignment, so we can cast here
249          */
250         hdr = (const struct messaging_hdr *)msg;
251
252         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
253                    __func__, (unsigned)hdr->msg_type,
254                    (unsigned)(msg_len - sizeof(*hdr)),
255                    (unsigned)num_fds,
256                    server_id_str_buf(hdr->src, &idbuf)));
257
258         rec = (struct messaging_rec) {
259                 .msg_version = MESSAGE_VERSION,
260                 .msg_type = hdr->msg_type,
261                 .src = hdr->src,
262                 .dest = hdr->dst,
263                 .buf.data = discard_const_p(uint8, msg) + sizeof(*hdr),
264                 .buf.length = msg_len - sizeof(*hdr),
265                 .num_fds = num_fds,
266                 .fds = fds64,
267         };
268
269         messaging_dispatch_rec(msg_ctx, &rec);
270 }
271
272 static int messaging_context_destructor(struct messaging_context *ctx)
273 {
274         unsigned i;
275
276         messaging_dgm_destroy();
277
278         for (i=0; i<ctx->num_new_waiters; i++) {
279                 if (ctx->new_waiters[i] != NULL) {
280                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
281                         ctx->new_waiters[i] = NULL;
282                 }
283         }
284         for (i=0; i<ctx->num_waiters; i++) {
285                 if (ctx->waiters[i] != NULL) {
286                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
287                         ctx->waiters[i] = NULL;
288                 }
289         }
290
291         return 0;
292 }
293
294 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
295                                          struct tevent_context *ev)
296 {
297         struct messaging_context *ctx;
298         NTSTATUS status;
299         int ret;
300
301         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
302                 return NULL;
303         }
304
305         ctx->id = procid_self();
306         ctx->event_ctx = ev;
307
308         sec_init();
309
310         ret = messaging_dgm_init(ctx->event_ctx, ctx->id,
311                                  lp_cache_directory(), sec_initial_uid(),
312                                  messaging_recv_cb, ctx);
313
314         if (ret != 0) {
315                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
316                 TALLOC_FREE(ctx);
317                 return NULL;
318         }
319
320         ctx->names_db = server_id_db_init(
321                 ctx, ctx->id, lp_cache_directory(), 0,
322                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
323         if (ctx->names_db == NULL) {
324                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
325                 TALLOC_FREE(ctx);
326                 return NULL;
327         }
328
329         talloc_set_destructor(ctx, messaging_context_destructor);
330
331         if (lp_clustering()) {
332                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
333
334                 if (!NT_STATUS_IS_OK(status)) {
335                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
336                                   nt_errstr(status)));
337                         TALLOC_FREE(ctx);
338                         return NULL;
339                 }
340         }
341         ctx->id.vnn = get_my_vnn();
342
343         messaging_register(ctx, NULL, MSG_PING, ping_message);
344
345         /* Register some debugging related messages */
346
347         register_msg_pool_usage(ctx);
348         register_dmalloc_msgs(ctx);
349         debug_register_msgs(ctx);
350
351         return ctx;
352 }
353
354 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
355 {
356         return msg_ctx->id;
357 }
358
359 /*
360  * re-init after a fork
361  */
362 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
363 {
364         NTSTATUS status;
365         int ret;
366
367         messaging_dgm_destroy();
368
369         msg_ctx->id = procid_self();
370
371         ret = messaging_dgm_init(msg_ctx->event_ctx, msg_ctx->id,
372                                  lp_cache_directory(), sec_initial_uid(),
373                                  messaging_recv_cb, msg_ctx);
374         if (ret != 0) {
375                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
376                 return map_nt_error_from_unix(ret);
377         }
378
379         TALLOC_FREE(msg_ctx->remote);
380
381         if (lp_clustering()) {
382                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
383                                               &msg_ctx->remote);
384
385                 if (!NT_STATUS_IS_OK(status)) {
386                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
387                                   nt_errstr(status)));
388                         return status;
389                 }
390         }
391
392         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
393
394         return NT_STATUS_OK;
395 }
396
397
398 /*
399  * Register a dispatch function for a particular message type. Allow multiple
400  * registrants
401 */
402 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
403                             void *private_data,
404                             uint32_t msg_type,
405                             void (*fn)(struct messaging_context *msg,
406                                        void *private_data, 
407                                        uint32_t msg_type, 
408                                        struct server_id server_id,
409                                        DATA_BLOB *data))
410 {
411         struct messaging_callback *cb;
412
413         DEBUG(5, ("Registering messaging pointer for type %u - "
414                   "private_data=%p\n",
415                   (unsigned)msg_type, private_data));
416
417         /*
418          * Only one callback per type
419          */
420
421         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
422                 /* we allow a second registration of the same message
423                    type if it has a different private pointer. This is
424                    needed in, for example, the internal notify code,
425                    which creates a new notify context for each tree
426                    connect, and expects to receive messages to each of
427                    them. */
428                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
429                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
430                                   (unsigned)msg_type, private_data));
431                         cb->fn = fn;
432                         cb->private_data = private_data;
433                         return NT_STATUS_OK;
434                 }
435         }
436
437         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
438                 return NT_STATUS_NO_MEMORY;
439         }
440
441         cb->msg_type = msg_type;
442         cb->fn = fn;
443         cb->private_data = private_data;
444
445         DLIST_ADD(msg_ctx->callbacks, cb);
446         return NT_STATUS_OK;
447 }
448
449 /*
450   De-register the function for a particular message type.
451 */
452 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
453                           void *private_data)
454 {
455         struct messaging_callback *cb, *next;
456
457         for (cb = ctx->callbacks; cb; cb = next) {
458                 next = cb->next;
459                 if ((cb->msg_type == msg_type)
460                     && (cb->private_data == private_data)) {
461                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
462                                   (unsigned)msg_type, private_data));
463                         DLIST_REMOVE(ctx->callbacks, cb);
464                         TALLOC_FREE(cb);
465                 }
466         }
467 }
468
469 /*
470   Send a message to a particular server
471 */
472 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
473                         struct server_id server, uint32_t msg_type,
474                         const DATA_BLOB *data)
475 {
476         struct iovec iov;
477
478         iov.iov_base = data->data;
479         iov.iov_len = data->length;
480
481         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
482 }
483
484 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
485                             struct server_id server, uint32_t msg_type,
486                             const uint8_t *buf, size_t len)
487 {
488         DATA_BLOB blob = data_blob_const(buf, len);
489         return messaging_send(msg_ctx, server, msg_type, &blob);
490 }
491
492 NTSTATUS messaging_send_iov_from(struct messaging_context *msg_ctx,
493                                  struct server_id src, struct server_id dst,
494                                  uint32_t msg_type,
495                                  const struct iovec *iov, int iovlen,
496                                  const int *fds, size_t num_fds)
497 {
498         int ret;
499         struct messaging_hdr hdr;
500         struct iovec iov2[iovlen+1];
501
502         if (server_id_is_disconnected(&dst)) {
503                 return NT_STATUS_INVALID_PARAMETER_MIX;
504         }
505
506         if (num_fds > INT8_MAX) {
507                 return NT_STATUS_INVALID_PARAMETER_MIX;
508         }
509
510         if (!procid_is_local(&dst)) {
511                 if (num_fds > 0) {
512                         return NT_STATUS_NOT_SUPPORTED;
513                 }
514
515                 ret = msg_ctx->remote->send_fn(src, dst,
516                                                msg_type, iov, iovlen,
517                                                NULL, 0,
518                                                msg_ctx->remote);
519                 if (ret != 0) {
520                         return map_nt_error_from_unix(ret);
521                 }
522                 return NT_STATUS_OK;
523         }
524
525         ZERO_STRUCT(hdr);
526         hdr = (struct messaging_hdr) {
527                 .msg_type = msg_type,
528                 .dst = dst,
529                 .src = src
530         };
531         iov2[0] = (struct iovec){ .iov_base = &hdr, .iov_len = sizeof(hdr) };
532         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
533
534         become_root();
535         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
536         unbecome_root();
537
538         if (ret != 0) {
539                 return map_nt_error_from_unix(ret);
540         }
541         return NT_STATUS_OK;
542 }
543
544 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
545                             struct server_id dst, uint32_t msg_type,
546                             const struct iovec *iov, int iovlen,
547                             const int *fds, size_t num_fds)
548 {
549         return messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
550                                        iov, iovlen, fds, num_fds);
551 }
552
553 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
554                                                struct messaging_rec *rec)
555 {
556         struct messaging_rec *result;
557         size_t fds_size = sizeof(int64_t) * rec->num_fds;
558
559         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
560                                       rec->buf.length + fds_size);
561         if (result == NULL) {
562                 return NULL;
563         }
564         *result = *rec;
565
566         /* Doesn't fail, see talloc_pooled_object */
567
568         result->buf.data = talloc_memdup(result, rec->buf.data,
569                                          rec->buf.length);
570
571         result->fds = NULL;
572         if (result->num_fds > 0) {
573                 result->fds = talloc_array(result, int64_t, result->num_fds);
574                 memcpy(result->fds, rec->fds, fds_size);
575         }
576
577         return result;
578 }
579
580 struct messaging_filtered_read_state {
581         struct tevent_context *ev;
582         struct messaging_context *msg_ctx;
583         void *tevent_handle;
584
585         bool (*filter)(struct messaging_rec *rec, void *private_data);
586         void *private_data;
587
588         struct messaging_rec *rec;
589 };
590
591 static void messaging_filtered_read_cleanup(struct tevent_req *req,
592                                             enum tevent_req_state req_state);
593
594 struct tevent_req *messaging_filtered_read_send(
595         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
596         struct messaging_context *msg_ctx,
597         bool (*filter)(struct messaging_rec *rec, void *private_data),
598         void *private_data)
599 {
600         struct tevent_req *req;
601         struct messaging_filtered_read_state *state;
602         size_t new_waiters_len;
603
604         req = tevent_req_create(mem_ctx, &state,
605                                 struct messaging_filtered_read_state);
606         if (req == NULL) {
607                 return NULL;
608         }
609         state->ev = ev;
610         state->msg_ctx = msg_ctx;
611         state->filter = filter;
612         state->private_data = private_data;
613
614         /*
615          * We have to defer the callback here, as we might be called from
616          * within a different tevent_context than state->ev
617          */
618         tevent_req_defer_callback(req, state->ev);
619
620         state->tevent_handle = messaging_dgm_register_tevent_context(
621                 state, ev);
622         if (tevent_req_nomem(state, req)) {
623                 return tevent_req_post(req, ev);
624         }
625
626         /*
627          * We add ourselves to the "new_waiters" array, not the "waiters"
628          * array. If we are called from within messaging_read_done,
629          * messaging_dispatch_rec will be in an active for-loop on
630          * "waiters". We must be careful not to mess with this array, because
631          * it could mean that a single event is being delivered twice.
632          */
633
634         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
635
636         if (new_waiters_len == msg_ctx->num_new_waiters) {
637                 struct tevent_req **tmp;
638
639                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
640                                      struct tevent_req *, new_waiters_len+1);
641                 if (tevent_req_nomem(tmp, req)) {
642                         return tevent_req_post(req, ev);
643                 }
644                 msg_ctx->new_waiters = tmp;
645         }
646
647         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
648         msg_ctx->num_new_waiters += 1;
649         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
650
651         return req;
652 }
653
654 static void messaging_filtered_read_cleanup(struct tevent_req *req,
655                                             enum tevent_req_state req_state)
656 {
657         struct messaging_filtered_read_state *state = tevent_req_data(
658                 req, struct messaging_filtered_read_state);
659         struct messaging_context *msg_ctx = state->msg_ctx;
660         unsigned i;
661
662         tevent_req_set_cleanup_fn(req, NULL);
663
664         TALLOC_FREE(state->tevent_handle);
665
666         /*
667          * Just set the [new_]waiters entry to NULL, be careful not to mess
668          * with the other "waiters" array contents. We are often called from
669          * within "messaging_dispatch_rec", which loops over
670          * "waiters". Messing with the "waiters" array will mess up that
671          * for-loop.
672          */
673
674         for (i=0; i<msg_ctx->num_waiters; i++) {
675                 if (msg_ctx->waiters[i] == req) {
676                         msg_ctx->waiters[i] = NULL;
677                         return;
678                 }
679         }
680
681         for (i=0; i<msg_ctx->num_new_waiters; i++) {
682                 if (msg_ctx->new_waiters[i] == req) {
683                         msg_ctx->new_waiters[i] = NULL;
684                         return;
685                 }
686         }
687 }
688
689 static void messaging_filtered_read_done(struct tevent_req *req,
690                                          struct messaging_rec *rec)
691 {
692         struct messaging_filtered_read_state *state = tevent_req_data(
693                 req, struct messaging_filtered_read_state);
694
695         state->rec = messaging_rec_dup(state, rec);
696         if (tevent_req_nomem(state->rec, req)) {
697                 return;
698         }
699         tevent_req_done(req);
700 }
701
702 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
703                                  struct messaging_rec **presult)
704 {
705         struct messaging_filtered_read_state *state = tevent_req_data(
706                 req, struct messaging_filtered_read_state);
707         int err;
708
709         if (tevent_req_is_unix_error(req, &err)) {
710                 tevent_req_received(req);
711                 return err;
712         }
713         *presult = talloc_move(mem_ctx, &state->rec);
714         return 0;
715 }
716
717 struct messaging_read_state {
718         uint32_t msg_type;
719         struct messaging_rec *rec;
720 };
721
722 static bool messaging_read_filter(struct messaging_rec *rec,
723                                   void *private_data);
724 static void messaging_read_done(struct tevent_req *subreq);
725
726 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
727                                        struct tevent_context *ev,
728                                        struct messaging_context *msg,
729                                        uint32_t msg_type)
730 {
731         struct tevent_req *req, *subreq;
732         struct messaging_read_state *state;
733
734         req = tevent_req_create(mem_ctx, &state,
735                                 struct messaging_read_state);
736         if (req == NULL) {
737                 return NULL;
738         }
739         state->msg_type = msg_type;
740
741         subreq = messaging_filtered_read_send(state, ev, msg,
742                                               messaging_read_filter, state);
743         if (tevent_req_nomem(subreq, req)) {
744                 return tevent_req_post(req, ev);
745         }
746         tevent_req_set_callback(subreq, messaging_read_done, req);
747         return req;
748 }
749
750 static bool messaging_read_filter(struct messaging_rec *rec,
751                                   void *private_data)
752 {
753         struct messaging_read_state *state = talloc_get_type_abort(
754                 private_data, struct messaging_read_state);
755
756         if (rec->num_fds != 0) {
757                 return false;
758         }
759
760         return rec->msg_type == state->msg_type;
761 }
762
763 static void messaging_read_done(struct tevent_req *subreq)
764 {
765         struct tevent_req *req = tevent_req_callback_data(
766                 subreq, struct tevent_req);
767         struct messaging_read_state *state = tevent_req_data(
768                 req, struct messaging_read_state);
769         int ret;
770
771         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
772         TALLOC_FREE(subreq);
773         if (tevent_req_error(req, ret)) {
774                 return;
775         }
776         tevent_req_done(req);
777 }
778
779 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
780                         struct messaging_rec **presult)
781 {
782         struct messaging_read_state *state = tevent_req_data(
783                 req, struct messaging_read_state);
784         int err;
785
786         if (tevent_req_is_unix_error(req, &err)) {
787                 return err;
788         }
789         if (presult != NULL) {
790                 *presult = talloc_move(mem_ctx, &state->rec);
791         }
792         return 0;
793 }
794
795 struct messaging_handler_state {
796         struct tevent_context *ev;
797         struct messaging_context *msg_ctx;
798         uint32_t msg_type;
799         bool (*handler)(struct messaging_context *msg_ctx,
800                         struct messaging_rec **rec, void *private_data);
801         void *private_data;
802 };
803
804 static void messaging_handler_got_msg(struct tevent_req *subreq);
805
806 struct tevent_req *messaging_handler_send(
807         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
808         struct messaging_context *msg_ctx, uint32_t msg_type,
809         bool (*handler)(struct messaging_context *msg_ctx,
810                         struct messaging_rec **rec, void *private_data),
811         void *private_data)
812 {
813         struct tevent_req *req, *subreq;
814         struct messaging_handler_state *state;
815
816         req = tevent_req_create(mem_ctx, &state,
817                                 struct messaging_handler_state);
818         if (req == NULL) {
819                 return NULL;
820         }
821         state->ev = ev;
822         state->msg_ctx = msg_ctx;
823         state->msg_type = msg_type;
824         state->handler = handler;
825         state->private_data = private_data;
826
827         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
828                                      state->msg_type);
829         if (tevent_req_nomem(subreq, req)) {
830                 return tevent_req_post(req, ev);
831         }
832         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
833         return req;
834 }
835
836 static void messaging_handler_got_msg(struct tevent_req *subreq)
837 {
838         struct tevent_req *req = tevent_req_callback_data(
839                 subreq, struct tevent_req);
840         struct messaging_handler_state *state = tevent_req_data(
841                 req, struct messaging_handler_state);
842         struct messaging_rec *rec;
843         int ret;
844         bool ok;
845
846         ret = messaging_read_recv(subreq, state, &rec);
847         TALLOC_FREE(subreq);
848         if (tevent_req_error(req, ret)) {
849                 return;
850         }
851
852         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
853                                      state->msg_type);
854         if (tevent_req_nomem(subreq, req)) {
855                 return;
856         }
857         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
858
859         ok = state->handler(state->msg_ctx, &rec, state->private_data);
860         TALLOC_FREE(rec);
861         if (ok) {
862                 /*
863                  * Next round
864                  */
865                 return;
866         }
867         TALLOC_FREE(subreq);
868         tevent_req_done(req);
869 }
870
871 int messaging_handler_recv(struct tevent_req *req)
872 {
873         return tevent_req_simple_recv_unix(req);
874 }
875
876 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
877 {
878         if (msg_ctx->num_new_waiters == 0) {
879                 return true;
880         }
881
882         if (talloc_array_length(msg_ctx->waiters) <
883             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
884                 struct tevent_req **tmp;
885                 tmp = talloc_realloc(
886                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
887                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
888                 if (tmp == NULL) {
889                         DEBUG(1, ("%s: talloc failed\n", __func__));
890                         return false;
891                 }
892                 msg_ctx->waiters = tmp;
893         }
894
895         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
896                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
897
898         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
899         msg_ctx->num_new_waiters = 0;
900
901         return true;
902 }
903
904 /*
905   Dispatch one messaging_rec
906 */
907 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
908                             struct messaging_rec *rec)
909 {
910         struct messaging_callback *cb, *next;
911         unsigned i;
912         size_t j;
913
914         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
915                 next = cb->next;
916                 if (cb->msg_type != rec->msg_type) {
917                         continue;
918                 }
919
920                 /*
921                  * the old style callbacks don't support fd passing
922                  */
923                 for (j=0; j < rec->num_fds; j++) {
924                         int fd = rec->fds[j];
925                         close(fd);
926                 }
927                 rec->num_fds = 0;
928                 rec->fds = NULL;
929
930                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
931                        rec->src, &rec->buf);
932
933                 /*
934                  * we continue looking for matching messages after finding
935                  * one. This matters for subsystems like the internal notify
936                  * code which register more than one handler for the same
937                  * message type
938                  */
939         }
940
941         if (!messaging_append_new_waiters(msg_ctx)) {
942                 for (j=0; j < rec->num_fds; j++) {
943                         int fd = rec->fds[j];
944                         close(fd);
945                 }
946                 rec->num_fds = 0;
947                 rec->fds = NULL;
948                 return;
949         }
950
951         i = 0;
952         while (i < msg_ctx->num_waiters) {
953                 struct tevent_req *req;
954                 struct messaging_filtered_read_state *state;
955
956                 req = msg_ctx->waiters[i];
957                 if (req == NULL) {
958                         /*
959                          * This got cleaned up. In the meantime,
960                          * move everything down one. We need
961                          * to keep the order of waiters, as
962                          * other code may depend on this.
963                          */
964                         if (i < msg_ctx->num_waiters - 1) {
965                                 memmove(&msg_ctx->waiters[i],
966                                         &msg_ctx->waiters[i+1],
967                                         sizeof(struct tevent_req *) *
968                                             (msg_ctx->num_waiters - i - 1));
969                         }
970                         msg_ctx->num_waiters -= 1;
971                         continue;
972                 }
973
974                 state = tevent_req_data(
975                         req, struct messaging_filtered_read_state);
976                 if (state->filter(rec, state->private_data)) {
977                         messaging_filtered_read_done(req, rec);
978
979                         /*
980                          * Only the first one gets the fd-array
981                          */
982                         rec->num_fds = 0;
983                         rec->fds = NULL;
984                 }
985
986                 i += 1;
987         }
988
989         /*
990          * If the fd-array isn't used, just close it.
991          */
992         for (j=0; j < rec->num_fds; j++) {
993                 int fd = rec->fds[j];
994                 close(fd);
995         }
996         rec->num_fds = 0;
997         rec->fds = NULL;
998 }
999
1000 static int mess_parent_dgm_cleanup(void *private_data);
1001 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1002
1003 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1004 {
1005         struct tevent_req *req;
1006
1007         req = background_job_send(
1008                 msg, msg->event_ctx, msg, NULL, 0,
1009                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1010                             60*15),
1011                 mess_parent_dgm_cleanup, msg);
1012         if (req == NULL) {
1013                 return false;
1014         }
1015         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1016         return true;
1017 }
1018
1019 static int mess_parent_dgm_cleanup(void *private_data)
1020 {
1021         int ret;
1022
1023         ret = messaging_dgm_wipe();
1024         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1025                    ret ? strerror(ret) : "ok"));
1026         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1027                            60*15);
1028 }
1029
1030 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1031 {
1032         struct messaging_context *msg = tevent_req_callback_data(
1033                 req, struct messaging_context);
1034         NTSTATUS status;
1035
1036         status = background_job_recv(req);
1037         TALLOC_FREE(req);
1038         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1039                   nt_errstr(status)));
1040
1041         req = background_job_send(
1042                 msg, msg->event_ctx, msg, NULL, 0,
1043                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1044                             60*15),
1045                 mess_parent_dgm_cleanup, msg);
1046         if (req == NULL) {
1047                 DEBUG(1, ("background_job_send failed\n"));
1048         }
1049         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1050 }
1051
1052 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1053 {
1054         int ret;
1055
1056         if (pid == 0) {
1057                 ret = messaging_dgm_wipe();
1058         } else {
1059                 ret = messaging_dgm_cleanup(pid);
1060         }
1061
1062         return ret;
1063 }
1064
1065 struct tevent_context *messaging_tevent_context(
1066         struct messaging_context *msg_ctx)
1067 {
1068         return msg_ctx->event_ctx;
1069 }
1070
1071 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1072 {
1073         return msg_ctx->names_db;
1074 }
1075
1076 /** @} **/