messaging3: Move sec_init() call out of messaging_dgm_init()
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54
55 struct messaging_callback {
56         struct messaging_callback *prev, *next;
57         uint32 msg_type;
58         void (*fn)(struct messaging_context *msg, void *private_data, 
59                    uint32_t msg_type, 
60                    struct server_id server_id, DATA_BLOB *data);
61         void *private_data;
62 };
63
64 struct messaging_context {
65         struct server_id id;
66         struct tevent_context *event_ctx;
67         struct messaging_callback *callbacks;
68
69         struct tevent_req **new_waiters;
70         unsigned num_new_waiters;
71
72         struct tevent_req **waiters;
73         unsigned num_waiters;
74
75         struct messaging_dgm_context *local;
76
77         struct messaging_backend *remote;
78
79         bool *have_context;
80 };
81
82 static int messaging_context_destructor(struct messaging_context *msg_ctx);
83
84 /****************************************************************************
85  A useful function for testing the message system.
86 ****************************************************************************/
87
88 static void ping_message(struct messaging_context *msg_ctx,
89                          void *private_data,
90                          uint32_t msg_type,
91                          struct server_id src,
92                          DATA_BLOB *data)
93 {
94         struct server_id_buf idbuf;
95
96         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
97                   server_id_str_buf(src, &idbuf), (int)data->length,
98                   data->data ? (char *)data->data : ""));
99
100         messaging_send(msg_ctx, src, MSG_PONG, data);
101 }
102
103 /****************************************************************************
104  Register/replace a dispatch function for a particular message type.
105  JRA changed Dec 13 2006. Only one message handler now permitted per type.
106  *NOTE*: Dispatch functions must be able to cope with incoming
107  messages on an *odd* byte boundary.
108 ****************************************************************************/
109
110 struct msg_all {
111         struct messaging_context *msg_ctx;
112         int msg_type;
113         uint32 msg_flag;
114         const void *buf;
115         size_t len;
116         int n_sent;
117 };
118
119 /****************************************************************************
120  Send one of the messages for the broadcast.
121 ****************************************************************************/
122
123 static int traverse_fn(struct db_record *rec, const struct server_id *id,
124                        uint32_t msg_flags, void *state)
125 {
126         struct msg_all *msg_all = (struct msg_all *)state;
127         NTSTATUS status;
128
129         /* Don't send if the receiver hasn't registered an interest. */
130
131         if((msg_flags & msg_all->msg_flag) == 0) {
132                 return 0;
133         }
134
135         /* If the msg send fails because the pid was not found (i.e. smbd died), 
136          * the msg has already been deleted from the messages.tdb.*/
137
138         status = messaging_send_buf(msg_all->msg_ctx, *id, msg_all->msg_type,
139                                     (const uint8_t *)msg_all->buf, msg_all->len);
140
141         if (NT_STATUS_EQUAL(status, NT_STATUS_INVALID_HANDLE)) {
142                 struct server_id_buf idbuf;
143
144                 /*
145                  * If the pid was not found delete the entry from
146                  * serverid.tdb
147                  */
148
149                 DEBUG(2, ("pid %s doesn't exist\n",
150                           server_id_str_buf(*id, &idbuf)));
151
152                 dbwrap_record_delete(rec);
153         }
154         msg_all->n_sent++;
155         return 0;
156 }
157
158 /**
159  * Send a message to all smbd processes.
160  *
161  * It isn't very efficient, but should be OK for the sorts of
162  * applications that use it. When we need efficient broadcast we can add
163  * it.
164  *
165  * @param n_sent Set to the number of messages sent.  This should be
166  * equal to the number of processes, but be careful for races.
167  *
168  * @retval True for success.
169  **/
170 bool message_send_all(struct messaging_context *msg_ctx,
171                       int msg_type,
172                       const void *buf, size_t len,
173                       int *n_sent)
174 {
175         struct msg_all msg_all;
176
177         msg_all.msg_type = msg_type;
178         if (msg_type < 0x100) {
179                 msg_all.msg_flag = FLAG_MSG_GENERAL;
180         } else if (msg_type > 0x100 && msg_type < 0x200) {
181                 msg_all.msg_flag = FLAG_MSG_NMBD;
182         } else if (msg_type > 0x200 && msg_type < 0x300) {
183                 msg_all.msg_flag = FLAG_MSG_PRINT_GENERAL;
184         } else if (msg_type > 0x300 && msg_type < 0x400) {
185                 msg_all.msg_flag = FLAG_MSG_SMBD;
186         } else if (msg_type > 0x400 && msg_type < 0x600) {
187                 msg_all.msg_flag = FLAG_MSG_WINBIND;
188         } else if (msg_type > 4000 && msg_type < 5000) {
189                 msg_all.msg_flag = FLAG_MSG_DBWRAP;
190         } else {
191                 return false;
192         }
193
194         msg_all.buf = buf;
195         msg_all.len = len;
196         msg_all.n_sent = 0;
197         msg_all.msg_ctx = msg_ctx;
198
199         serverid_traverse(traverse_fn, &msg_all);
200         if (n_sent)
201                 *n_sent = msg_all.n_sent;
202         return true;
203 }
204
205 static void messaging_recv_cb(int msg_type,
206                               struct server_id src, struct server_id dst,
207                               const uint8_t *msg, size_t msg_len,
208                               void *private_data)
209 {
210         struct messaging_context *msg_ctx = talloc_get_type_abort(
211                 private_data, struct messaging_context);
212         struct messaging_rec rec;
213
214         rec = (struct messaging_rec) {
215                 .msg_version = MESSAGE_VERSION,
216                 .msg_type = msg_type,
217                 .src = src,
218                 .dest = dst,
219                 .buf.data = discard_const_p(uint8, msg),
220                 .buf.length = msg_len
221         };
222
223         messaging_dispatch_rec(msg_ctx, &rec);
224 }
225
226 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
227                                          struct tevent_context *ev)
228 {
229         struct messaging_context *ctx;
230         NTSTATUS status;
231         int ret;
232         static bool have_context = false;
233
234         if (have_context) {
235                 DEBUG(0, ("No two messaging contexts per process\n"));
236                 return NULL;
237         }
238
239
240         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
241                 return NULL;
242         }
243
244         ctx->id = procid_self();
245         ctx->event_ctx = ev;
246         ctx->have_context = &have_context;
247
248         sec_init();
249
250         ret = messaging_dgm_init(ctx, ctx->event_ctx, ctx->id,
251                                  messaging_recv_cb, ctx, &ctx->local);
252
253         if (ret != 0) {
254                 DEBUG(2, ("messaging_dgm_init failed: %s\n", strerror(ret)));
255                 TALLOC_FREE(ctx);
256                 return NULL;
257         }
258
259         if (lp_clustering()) {
260                 status = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
261
262                 if (!NT_STATUS_IS_OK(status)) {
263                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
264                                   nt_errstr(status)));
265                         TALLOC_FREE(ctx);
266                         return NULL;
267                 }
268         }
269         ctx->id.vnn = get_my_vnn();
270
271         messaging_register(ctx, NULL, MSG_PING, ping_message);
272
273         /* Register some debugging related messages */
274
275         register_msg_pool_usage(ctx);
276         register_dmalloc_msgs(ctx);
277         debug_register_msgs(ctx);
278
279         have_context = true;
280         talloc_set_destructor(ctx, messaging_context_destructor);
281
282         return ctx;
283 }
284
285 static int messaging_context_destructor(struct messaging_context *msg_ctx)
286 {
287         SMB_ASSERT(*msg_ctx->have_context);
288         *msg_ctx->have_context = false;
289         return 0;
290 }
291
292 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
293 {
294         return msg_ctx->id;
295 }
296
297 /*
298  * re-init after a fork
299  */
300 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
301 {
302         NTSTATUS status;
303         int ret;
304
305         TALLOC_FREE(msg_ctx->local);
306
307         msg_ctx->id = procid_self();
308
309         ret = messaging_dgm_init(msg_ctx, msg_ctx->event_ctx,
310                                  msg_ctx->id, messaging_recv_cb, msg_ctx,
311                                  &msg_ctx->local);
312         if (ret != 0) {
313                 DEBUG(0, ("messaging_dgm_init failed: %s\n", strerror(errno)));
314                 return map_nt_error_from_unix(ret);
315         }
316
317         TALLOC_FREE(msg_ctx->remote);
318
319         if (lp_clustering()) {
320                 status = messaging_ctdbd_init(msg_ctx, msg_ctx,
321                                               &msg_ctx->remote);
322
323                 if (!NT_STATUS_IS_OK(status)) {
324                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
325                                   nt_errstr(status)));
326                         return status;
327                 }
328         }
329
330         return NT_STATUS_OK;
331 }
332
333
334 /*
335  * Register a dispatch function for a particular message type. Allow multiple
336  * registrants
337 */
338 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
339                             void *private_data,
340                             uint32_t msg_type,
341                             void (*fn)(struct messaging_context *msg,
342                                        void *private_data, 
343                                        uint32_t msg_type, 
344                                        struct server_id server_id,
345                                        DATA_BLOB *data))
346 {
347         struct messaging_callback *cb;
348
349         DEBUG(5, ("Registering messaging pointer for type %u - "
350                   "private_data=%p\n",
351                   (unsigned)msg_type, private_data));
352
353         /*
354          * Only one callback per type
355          */
356
357         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
358                 /* we allow a second registration of the same message
359                    type if it has a different private pointer. This is
360                    needed in, for example, the internal notify code,
361                    which creates a new notify context for each tree
362                    connect, and expects to receive messages to each of
363                    them. */
364                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
365                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
366                                   (unsigned)msg_type, private_data));
367                         cb->fn = fn;
368                         cb->private_data = private_data;
369                         return NT_STATUS_OK;
370                 }
371         }
372
373         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
374                 return NT_STATUS_NO_MEMORY;
375         }
376
377         cb->msg_type = msg_type;
378         cb->fn = fn;
379         cb->private_data = private_data;
380
381         DLIST_ADD(msg_ctx->callbacks, cb);
382         return NT_STATUS_OK;
383 }
384
385 /*
386   De-register the function for a particular message type.
387 */
388 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
389                           void *private_data)
390 {
391         struct messaging_callback *cb, *next;
392
393         for (cb = ctx->callbacks; cb; cb = next) {
394                 next = cb->next;
395                 if ((cb->msg_type == msg_type)
396                     && (cb->private_data == private_data)) {
397                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
398                                   (unsigned)msg_type, private_data));
399                         DLIST_REMOVE(ctx->callbacks, cb);
400                         TALLOC_FREE(cb);
401                 }
402         }
403 }
404
405 static bool messaging_is_self_send(const struct messaging_context *msg_ctx,
406                                    const struct server_id *dst)
407 {
408         return ((msg_ctx->id.vnn == dst->vnn) &&
409                 (msg_ctx->id.pid == dst->pid));
410 }
411
412 /*
413   Send a message to a particular server
414 */
415 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
416                         struct server_id server, uint32_t msg_type,
417                         const DATA_BLOB *data)
418 {
419         struct iovec iov;
420
421         iov.iov_base = data->data;
422         iov.iov_len = data->length;
423
424         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1);
425 }
426
427 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
428                             struct server_id server, uint32_t msg_type,
429                             const uint8_t *buf, size_t len)
430 {
431         DATA_BLOB blob = data_blob_const(buf, len);
432         return messaging_send(msg_ctx, server, msg_type, &blob);
433 }
434
435 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
436                             struct server_id server, uint32_t msg_type,
437                             const struct iovec *iov, int iovlen)
438 {
439         int ret;
440
441         if (server_id_is_disconnected(&server)) {
442                 return NT_STATUS_INVALID_PARAMETER_MIX;
443         }
444
445         if (!procid_is_local(&server)) {
446                 ret = msg_ctx->remote->send_fn(msg_ctx->id, server,
447                                                msg_type, iov, iovlen,
448                                                msg_ctx->remote);
449                 if (ret != 0) {
450                         return map_nt_error_from_unix(ret);
451                 }
452                 return NT_STATUS_OK;
453         }
454
455         if (messaging_is_self_send(msg_ctx, &server)) {
456                 struct messaging_rec rec;
457                 uint8_t *buf;
458
459                 buf = iov_buf(talloc_tos(), iov, iovlen);
460                 if (buf == NULL) {
461                         return NT_STATUS_NO_MEMORY;
462                 }
463
464                 rec.msg_version = MESSAGE_VERSION;
465                 rec.msg_type = msg_type & MSG_TYPE_MASK;
466                 rec.dest = server;
467                 rec.src = msg_ctx->id;
468                 rec.buf = data_blob_const(buf, talloc_get_size(buf));
469                 messaging_dispatch_rec(msg_ctx, &rec);
470                 TALLOC_FREE(buf);
471                 return NT_STATUS_OK;
472         }
473
474         ret = messaging_dgm_send(msg_ctx->local, msg_ctx->id, server, msg_type,
475                                  iov, iovlen);
476         if (ret != 0) {
477                 return map_nt_error_from_unix(ret);
478         }
479         return NT_STATUS_OK;
480 }
481
482 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
483                                                struct messaging_rec *rec)
484 {
485         struct messaging_rec *result;
486
487         result = talloc_pooled_object(mem_ctx, struct messaging_rec,
488                                       1, rec->buf.length);
489         if (result == NULL) {
490                 return NULL;
491         }
492         *result = *rec;
493
494         /* Doesn't fail, see talloc_pooled_object */
495
496         result->buf.data = talloc_memdup(result, rec->buf.data,
497                                          rec->buf.length);
498         return result;
499 }
500
501 struct messaging_filtered_read_state {
502         struct tevent_context *ev;
503         struct messaging_context *msg_ctx;
504         void *tevent_handle;
505
506         bool (*filter)(struct messaging_rec *rec, void *private_data);
507         void *private_data;
508
509         struct messaging_rec *rec;
510 };
511
512 static void messaging_filtered_read_cleanup(struct tevent_req *req,
513                                             enum tevent_req_state req_state);
514
515 struct tevent_req *messaging_filtered_read_send(
516         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
517         struct messaging_context *msg_ctx,
518         bool (*filter)(struct messaging_rec *rec, void *private_data),
519         void *private_data)
520 {
521         struct tevent_req *req;
522         struct messaging_filtered_read_state *state;
523         size_t new_waiters_len;
524
525         req = tevent_req_create(mem_ctx, &state,
526                                 struct messaging_filtered_read_state);
527         if (req == NULL) {
528                 return NULL;
529         }
530         state->ev = ev;
531         state->msg_ctx = msg_ctx;
532         state->filter = filter;
533         state->private_data = private_data;
534
535         /*
536          * We have to defer the callback here, as we might be called from
537          * within a different tevent_context than state->ev
538          */
539         tevent_req_defer_callback(req, state->ev);
540
541         state->tevent_handle = messaging_dgm_register_tevent_context(
542                 state, msg_ctx->local, ev);
543         if (tevent_req_nomem(state, req)) {
544                 return tevent_req_post(req, ev);
545         }
546
547         /*
548          * We add ourselves to the "new_waiters" array, not the "waiters"
549          * array. If we are called from within messaging_read_done,
550          * messaging_dispatch_rec will be in an active for-loop on
551          * "waiters". We must be careful not to mess with this array, because
552          * it could mean that a single event is being delivered twice.
553          */
554
555         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
556
557         if (new_waiters_len == msg_ctx->num_new_waiters) {
558                 struct tevent_req **tmp;
559
560                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
561                                      struct tevent_req *, new_waiters_len+1);
562                 if (tevent_req_nomem(tmp, req)) {
563                         return tevent_req_post(req, ev);
564                 }
565                 msg_ctx->new_waiters = tmp;
566         }
567
568         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
569         msg_ctx->num_new_waiters += 1;
570         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
571
572         return req;
573 }
574
575 static void messaging_filtered_read_cleanup(struct tevent_req *req,
576                                             enum tevent_req_state req_state)
577 {
578         struct messaging_filtered_read_state *state = tevent_req_data(
579                 req, struct messaging_filtered_read_state);
580         struct messaging_context *msg_ctx = state->msg_ctx;
581         unsigned i;
582
583         tevent_req_set_cleanup_fn(req, NULL);
584
585         TALLOC_FREE(state->tevent_handle);
586
587         /*
588          * Just set the [new_]waiters entry to NULL, be careful not to mess
589          * with the other "waiters" array contents. We are often called from
590          * within "messaging_dispatch_rec", which loops over
591          * "waiters". Messing with the "waiters" array will mess up that
592          * for-loop.
593          */
594
595         for (i=0; i<msg_ctx->num_waiters; i++) {
596                 if (msg_ctx->waiters[i] == req) {
597                         msg_ctx->waiters[i] = NULL;
598                         return;
599                 }
600         }
601
602         for (i=0; i<msg_ctx->num_new_waiters; i++) {
603                 if (msg_ctx->new_waiters[i] == req) {
604                         msg_ctx->new_waiters[i] = NULL;
605                         return;
606                 }
607         }
608 }
609
610 static void messaging_filtered_read_done(struct tevent_req *req,
611                                          struct messaging_rec *rec)
612 {
613         struct messaging_filtered_read_state *state = tevent_req_data(
614                 req, struct messaging_filtered_read_state);
615
616         state->rec = messaging_rec_dup(state, rec);
617         if (tevent_req_nomem(state->rec, req)) {
618                 return;
619         }
620         tevent_req_done(req);
621 }
622
623 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
624                                  struct messaging_rec **presult)
625 {
626         struct messaging_filtered_read_state *state = tevent_req_data(
627                 req, struct messaging_filtered_read_state);
628         int err;
629
630         if (tevent_req_is_unix_error(req, &err)) {
631                 tevent_req_received(req);
632                 return err;
633         }
634         *presult = talloc_move(mem_ctx, &state->rec);
635         return 0;
636 }
637
638 struct messaging_read_state {
639         uint32_t msg_type;
640         struct messaging_rec *rec;
641 };
642
643 static bool messaging_read_filter(struct messaging_rec *rec,
644                                   void *private_data);
645 static void messaging_read_done(struct tevent_req *subreq);
646
647 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
648                                        struct tevent_context *ev,
649                                        struct messaging_context *msg,
650                                        uint32_t msg_type)
651 {
652         struct tevent_req *req, *subreq;
653         struct messaging_read_state *state;
654
655         req = tevent_req_create(mem_ctx, &state,
656                                 struct messaging_read_state);
657         if (req == NULL) {
658                 return NULL;
659         }
660         state->msg_type = msg_type;
661
662         subreq = messaging_filtered_read_send(state, ev, msg,
663                                               messaging_read_filter, state);
664         if (tevent_req_nomem(subreq, req)) {
665                 return tevent_req_post(req, ev);
666         }
667         tevent_req_set_callback(subreq, messaging_read_done, req);
668         return req;
669 }
670
671 static bool messaging_read_filter(struct messaging_rec *rec,
672                                   void *private_data)
673 {
674         struct messaging_read_state *state = talloc_get_type_abort(
675                 private_data, struct messaging_read_state);
676
677         return rec->msg_type == state->msg_type;
678 }
679
680 static void messaging_read_done(struct tevent_req *subreq)
681 {
682         struct tevent_req *req = tevent_req_callback_data(
683                 subreq, struct tevent_req);
684         struct messaging_read_state *state = tevent_req_data(
685                 req, struct messaging_read_state);
686         int ret;
687
688         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
689         TALLOC_FREE(subreq);
690         if (tevent_req_error(req, ret)) {
691                 return;
692         }
693         tevent_req_done(req);
694 }
695
696 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
697                         struct messaging_rec **presult)
698 {
699         struct messaging_read_state *state = tevent_req_data(
700                 req, struct messaging_read_state);
701         int err;
702
703         if (tevent_req_is_unix_error(req, &err)) {
704                 return err;
705         }
706         if (presult != NULL) {
707                 *presult = talloc_move(mem_ctx, &state->rec);
708         }
709         return 0;
710 }
711
712 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
713 {
714         if (msg_ctx->num_new_waiters == 0) {
715                 return true;
716         }
717
718         if (talloc_array_length(msg_ctx->waiters) <
719             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
720                 struct tevent_req **tmp;
721                 tmp = talloc_realloc(
722                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
723                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
724                 if (tmp == NULL) {
725                         DEBUG(1, ("%s: talloc failed\n", __func__));
726                         return false;
727                 }
728                 msg_ctx->waiters = tmp;
729         }
730
731         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
732                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
733
734         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
735         msg_ctx->num_new_waiters = 0;
736
737         return true;
738 }
739
740 struct messaging_defer_callback_state {
741         struct messaging_context *msg_ctx;
742         struct messaging_rec *rec;
743         void (*fn)(struct messaging_context *msg, void *private_data,
744                    uint32_t msg_type, struct server_id server_id,
745                    DATA_BLOB *data);
746         void *private_data;
747 };
748
749 static void messaging_defer_callback_trigger(struct tevent_context *ev,
750                                              struct tevent_immediate *im,
751                                              void *private_data);
752
753 static void messaging_defer_callback(
754         struct messaging_context *msg_ctx, struct messaging_rec *rec,
755         void (*fn)(struct messaging_context *msg, void *private_data,
756                    uint32_t msg_type, struct server_id server_id,
757                    DATA_BLOB *data),
758         void *private_data)
759 {
760         struct messaging_defer_callback_state *state;
761         struct tevent_immediate *im;
762
763         state = talloc(msg_ctx, struct messaging_defer_callback_state);
764         if (state == NULL) {
765                 DEBUG(1, ("talloc failed\n"));
766                 return;
767         }
768         state->msg_ctx = msg_ctx;
769         state->fn = fn;
770         state->private_data = private_data;
771
772         state->rec = messaging_rec_dup(state, rec);
773         if (state->rec == NULL) {
774                 DEBUG(1, ("talloc failed\n"));
775                 TALLOC_FREE(state);
776                 return;
777         }
778
779         im = tevent_create_immediate(state);
780         if (im == NULL) {
781                 DEBUG(1, ("tevent_create_immediate failed\n"));
782                 TALLOC_FREE(state);
783                 return;
784         }
785         tevent_schedule_immediate(im, msg_ctx->event_ctx,
786                                   messaging_defer_callback_trigger, state);
787 }
788
789 static void messaging_defer_callback_trigger(struct tevent_context *ev,
790                                              struct tevent_immediate *im,
791                                              void *private_data)
792 {
793         struct messaging_defer_callback_state *state = talloc_get_type_abort(
794                 private_data, struct messaging_defer_callback_state);
795         struct messaging_rec *rec = state->rec;
796
797         state->fn(state->msg_ctx, state->private_data, rec->msg_type, rec->src,
798                   &rec->buf);
799         TALLOC_FREE(state);
800 }
801
802 /*
803   Dispatch one messaging_rec
804 */
805 void messaging_dispatch_rec(struct messaging_context *msg_ctx,
806                             struct messaging_rec *rec)
807 {
808         struct messaging_callback *cb, *next;
809         unsigned i;
810
811         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
812                 next = cb->next;
813                 if (cb->msg_type != rec->msg_type) {
814                         continue;
815                 }
816
817                 if (messaging_is_self_send(msg_ctx, &rec->dest)) {
818                         /*
819                          * This is a self-send. We are called here from
820                          * messaging_send(), and we don't want to directly
821                          * recurse into the callback but go via a
822                          * tevent_loop_once
823                          */
824                         messaging_defer_callback(msg_ctx, rec, cb->fn,
825                                                  cb->private_data);
826                 } else {
827                         /*
828                          * This comes from a different process. we are called
829                          * from the event loop, so we should call back
830                          * directly.
831                          */
832                         cb->fn(msg_ctx, cb->private_data, rec->msg_type,
833                                rec->src, &rec->buf);
834                 }
835                 /*
836                  * we continue looking for matching messages after finding
837                  * one. This matters for subsystems like the internal notify
838                  * code which register more than one handler for the same
839                  * message type
840                  */
841         }
842
843         if (!messaging_append_new_waiters(msg_ctx)) {
844                 return;
845         }
846
847         i = 0;
848         while (i < msg_ctx->num_waiters) {
849                 struct tevent_req *req;
850                 struct messaging_filtered_read_state *state;
851
852                 req = msg_ctx->waiters[i];
853                 if (req == NULL) {
854                         /*
855                          * This got cleaned up. In the meantime,
856                          * move everything down one. We need
857                          * to keep the order of waiters, as
858                          * other code may depend on this.
859                          */
860                         if (i < msg_ctx->num_waiters - 1) {
861                                 memmove(&msg_ctx->waiters[i],
862                                         &msg_ctx->waiters[i+1],
863                                         sizeof(struct tevent_req *) *
864                                             (msg_ctx->num_waiters - i - 1));
865                         }
866                         msg_ctx->num_waiters -= 1;
867                         continue;
868                 }
869
870                 state = tevent_req_data(
871                         req, struct messaging_filtered_read_state);
872                 if (state->filter(rec, state->private_data)) {
873                         messaging_filtered_read_done(req, rec);
874                 }
875
876                 i += 1;
877         }
878 }
879
880 static int mess_parent_dgm_cleanup(void *private_data);
881 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
882
883 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
884 {
885         struct tevent_req *req;
886
887         req = background_job_send(
888                 msg, msg->event_ctx, msg, NULL, 0,
889                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
890                             60*15),
891                 mess_parent_dgm_cleanup, msg);
892         if (req == NULL) {
893                 return false;
894         }
895         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
896         return true;
897 }
898
899 static int mess_parent_dgm_cleanup(void *private_data)
900 {
901         struct messaging_context *msg_ctx = talloc_get_type_abort(
902                 private_data, struct messaging_context);
903         int ret;
904
905         ret = messaging_dgm_wipe(msg_ctx->local);
906         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
907                    ret ? strerror(ret) : "ok"));
908         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
909                            60*15);
910 }
911
912 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
913 {
914         struct messaging_context *msg = tevent_req_callback_data(
915                 req, struct messaging_context);
916         NTSTATUS status;
917
918         status = background_job_recv(req);
919         TALLOC_FREE(req);
920         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
921                   nt_errstr(status)));
922
923         req = background_job_send(
924                 msg, msg->event_ctx, msg, NULL, 0,
925                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
926                             60*15),
927                 mess_parent_dgm_cleanup, msg);
928         if (req == NULL) {
929                 DEBUG(1, ("background_job_send failed\n"));
930         }
931         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
932 }
933
934 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
935 {
936         int ret;
937
938         if (pid == 0) {
939                 ret = messaging_dgm_wipe(msg_ctx->local);
940         } else {
941                 ret = messaging_dgm_cleanup(msg_ctx->local, pid);
942         }
943
944         return ret;
945 }
946
947 struct tevent_context *messaging_tevent_context(
948         struct messaging_context *msg_ctx)
949 {
950         return msg_ctx->event_ctx;
951 }
952
953 /** @} **/