lib: Move "message_send_all" to serverid.c
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
109                               int *fds, size_t num_fds,
110                               void *private_data)
111 {
112         struct messaging_context *msg_ctx = talloc_get_type_abort(
113                 private_data, struct messaging_context);
114         struct server_id_buf idbuf;
115         struct messaging_rec rec;
116         int64_t fds64[MIN(num_fds, INT8_MAX)];
117         size_t i;
118
119         if (msg_len < MESSAGE_HDR_LENGTH) {
120                 DEBUG(1, ("message too short: %u\n", (unsigned)msg_len));
121                 goto close_fail;
122         }
123
124         if (num_fds > INT8_MAX) {
125                 DEBUG(1, ("too many fds: %u\n", (unsigned)num_fds));
126                 goto close_fail;
127         }
128
129         /*
130          * "consume" the fds by copying them and setting
131          * the original variable to -1
132          */
133         for (i=0; i < num_fds; i++) {
134                 fds64[i] = fds[i];
135                 fds[i] = -1;
136         }
137
138         rec = (struct messaging_rec) {
139                 .msg_version = MESSAGE_VERSION,
140                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
141                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
142                 .num_fds = num_fds,
143                 .fds = fds64,
144         };
145
146         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
147
148         DEBUG(10, ("%s: Received message 0x%x len %u (num_fds:%u) from %s\n",
149                    __func__, (unsigned)rec.msg_type,
150                    (unsigned)rec.buf.length,
151                    (unsigned)num_fds,
152                    server_id_str_buf(rec.src, &idbuf)));
153
154         messaging_dispatch_rec(msg_ctx, &rec);
155         return;
156
157 close_fail:
158         for (i=0; i < num_fds; i++) {
159                 close(fds[i]);
160         }
161 }
162
163 static int messaging_context_destructor(struct messaging_context *ctx)
164 {
165         unsigned i;
166
167         for (i=0; i<ctx->num_new_waiters; i++) {
168                 if (ctx->new_waiters[i] != NULL) {
169                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
170                         ctx->new_waiters[i] = NULL;
171                 }
172         }
173         for (i=0; i<ctx->num_waiters; i++) {
174                 if (ctx->waiters[i] != NULL) {
175                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
176                         ctx->waiters[i] = NULL;
177                 }
178         }
179
180         return 0;
181 }
182
183 static const char *private_path(const char *name)
184 {
185         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
186 }
187
188 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
189                                          struct tevent_context *ev)
190 {
191         struct messaging_context *ctx;
192         int ret;
193         const char *lck_path;
194         const char *priv_path;
195         bool ok;
196
197         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
198                 return NULL;
199         }
200
201         ctx->id = (struct server_id) {
202                 .pid = getpid(), .vnn = NONCLUSTER_VNN
203         };
204
205         ctx->event_ctx = ev;
206
207         sec_init();
208
209         lck_path = lock_path("msg.lock");
210         if (lck_path == NULL) {
211                 TALLOC_FREE(ctx);
212                 return NULL;
213         }
214
215         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
216                                               0755);
217         if (!ok) {
218                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
219                            __func__, strerror(errno)));
220                 TALLOC_FREE(ctx);
221                 return NULL;
222         }
223
224         priv_path = private_path("msg.sock");
225         if (priv_path == NULL) {
226                 TALLOC_FREE(ctx);
227                 return NULL;
228         }
229
230         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
231                                               0700);
232         if (!ok) {
233                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
234                            __func__, strerror(errno)));
235                 TALLOC_FREE(ctx);
236                 return NULL;
237         }
238
239         ctx->msg_dgm_ref = messaging_dgm_ref(
240                 ctx, ctx->event_ctx, &ctx->id.unique_id,
241                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
242
243         if (ctx->msg_dgm_ref == NULL) {
244                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
245                 TALLOC_FREE(ctx);
246                 return NULL;
247         }
248
249         talloc_set_destructor(ctx, messaging_context_destructor);
250
251         if (lp_clustering()) {
252                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
253
254                 if (ret != 0) {
255                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
256                                   strerror(ret)));
257                         TALLOC_FREE(ctx);
258                         return NULL;
259                 }
260         }
261         ctx->id.vnn = get_my_vnn();
262
263         ctx->names_db = server_id_db_init(
264                 ctx, ctx->id, lp_lock_directory(), 0,
265                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
266         if (ctx->names_db == NULL) {
267                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
268                 TALLOC_FREE(ctx);
269                 return NULL;
270         }
271
272         messaging_register(ctx, NULL, MSG_PING, ping_message);
273
274         /* Register some debugging related messages */
275
276         register_msg_pool_usage(ctx);
277         register_dmalloc_msgs(ctx);
278         debug_register_msgs(ctx);
279
280         {
281                 struct server_id_buf tmp;
282                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
283         }
284
285         return ctx;
286 }
287
288 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
289 {
290         return msg_ctx->id;
291 }
292
293 /*
294  * re-init after a fork
295  */
296 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
297 {
298         int ret;
299         char *lck_path;
300
301         TALLOC_FREE(msg_ctx->msg_dgm_ref);
302
303         msg_ctx->id = (struct server_id) {
304                 .pid = getpid(), .vnn = msg_ctx->id.vnn
305         };
306
307         lck_path = lock_path("msg.lock");
308         if (lck_path == NULL) {
309                 return NT_STATUS_NO_MEMORY;
310         }
311
312         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
313                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
314                 private_path("msg.sock"), lck_path,
315                 messaging_recv_cb, msg_ctx, &ret);
316
317         if (msg_ctx->msg_dgm_ref == NULL) {
318                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
319                 return map_nt_error_from_unix(ret);
320         }
321
322         if (lp_clustering()) {
323                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
324                                              msg_ctx->remote);
325
326                 if (ret != 0) {
327                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
328                                   strerror(ret)));
329                         return map_nt_error_from_unix(ret);
330                 }
331         }
332
333         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
334
335         return NT_STATUS_OK;
336 }
337
338
339 /*
340  * Register a dispatch function for a particular message type. Allow multiple
341  * registrants
342 */
343 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
344                             void *private_data,
345                             uint32_t msg_type,
346                             void (*fn)(struct messaging_context *msg,
347                                        void *private_data, 
348                                        uint32_t msg_type, 
349                                        struct server_id server_id,
350                                        DATA_BLOB *data))
351 {
352         struct messaging_callback *cb;
353
354         DEBUG(5, ("Registering messaging pointer for type %u - "
355                   "private_data=%p\n",
356                   (unsigned)msg_type, private_data));
357
358         /*
359          * Only one callback per type
360          */
361
362         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
363                 /* we allow a second registration of the same message
364                    type if it has a different private pointer. This is
365                    needed in, for example, the internal notify code,
366                    which creates a new notify context for each tree
367                    connect, and expects to receive messages to each of
368                    them. */
369                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
370                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
371                                   (unsigned)msg_type, private_data));
372                         cb->fn = fn;
373                         cb->private_data = private_data;
374                         return NT_STATUS_OK;
375                 }
376         }
377
378         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
379                 return NT_STATUS_NO_MEMORY;
380         }
381
382         cb->msg_type = msg_type;
383         cb->fn = fn;
384         cb->private_data = private_data;
385
386         DLIST_ADD(msg_ctx->callbacks, cb);
387         return NT_STATUS_OK;
388 }
389
390 /*
391   De-register the function for a particular message type.
392 */
393 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
394                           void *private_data)
395 {
396         struct messaging_callback *cb, *next;
397
398         for (cb = ctx->callbacks; cb; cb = next) {
399                 next = cb->next;
400                 if ((cb->msg_type == msg_type)
401                     && (cb->private_data == private_data)) {
402                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
403                                   (unsigned)msg_type, private_data));
404                         DLIST_REMOVE(ctx->callbacks, cb);
405                         TALLOC_FREE(cb);
406                 }
407         }
408 }
409
410 /*
411   Send a message to a particular server
412 */
413 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
414                         struct server_id server, uint32_t msg_type,
415                         const DATA_BLOB *data)
416 {
417         struct iovec iov = {0};
418
419         if (data != NULL) {
420                 iov.iov_base = data->data;
421                 iov.iov_len = data->length;
422         };
423
424         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
425 }
426
427 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
428                             struct server_id server, uint32_t msg_type,
429                             const uint8_t *buf, size_t len)
430 {
431         DATA_BLOB blob = data_blob_const(buf, len);
432         return messaging_send(msg_ctx, server, msg_type, &blob);
433 }
434
435 int messaging_send_iov_from(struct messaging_context *msg_ctx,
436                             struct server_id src, struct server_id dst,
437                             uint32_t msg_type,
438                             const struct iovec *iov, int iovlen,
439                             const int *fds, size_t num_fds)
440 {
441         int ret;
442         uint8_t hdr[MESSAGE_HDR_LENGTH];
443         struct iovec iov2[iovlen+1];
444
445         if (server_id_is_disconnected(&dst)) {
446                 return EINVAL;
447         }
448
449         if (num_fds > INT8_MAX) {
450                 return EINVAL;
451         }
452
453         if (dst.vnn != msg_ctx->id.vnn) {
454                 if (num_fds > 0) {
455                         return ENOSYS;
456                 }
457
458                 ret = msg_ctx->remote->send_fn(src, dst,
459                                                msg_type, iov, iovlen,
460                                                NULL, 0,
461                                                msg_ctx->remote);
462                 return ret;
463         }
464
465         message_hdr_put(hdr, msg_type, src, dst);
466         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
467         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
468
469         become_root();
470         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
471         unbecome_root();
472
473         return ret;
474 }
475
476 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
477                             struct server_id dst, uint32_t msg_type,
478                             const struct iovec *iov, int iovlen,
479                             const int *fds, size_t num_fds)
480 {
481         int ret;
482
483         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
484                                       iov, iovlen, fds, num_fds);
485         if (ret != 0) {
486                 return map_nt_error_from_unix(ret);
487         }
488         return NT_STATUS_OK;
489 }
490
491 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
492                                                struct messaging_rec *rec)
493 {
494         struct messaging_rec *result;
495         size_t fds_size = sizeof(int64_t) * rec->num_fds;
496
497         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
498                                       rec->buf.length + fds_size);
499         if (result == NULL) {
500                 return NULL;
501         }
502         *result = *rec;
503
504         /* Doesn't fail, see talloc_pooled_object */
505
506         result->buf.data = talloc_memdup(result, rec->buf.data,
507                                          rec->buf.length);
508
509         result->fds = NULL;
510         if (result->num_fds > 0) {
511                 result->fds = talloc_memdup(result, rec->fds, fds_size);
512         }
513
514         return result;
515 }
516
517 struct messaging_filtered_read_state {
518         struct tevent_context *ev;
519         struct messaging_context *msg_ctx;
520         void *tevent_handle;
521
522         bool (*filter)(struct messaging_rec *rec, void *private_data);
523         void *private_data;
524
525         struct messaging_rec *rec;
526 };
527
528 static void messaging_filtered_read_cleanup(struct tevent_req *req,
529                                             enum tevent_req_state req_state);
530
531 struct tevent_req *messaging_filtered_read_send(
532         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
533         struct messaging_context *msg_ctx,
534         bool (*filter)(struct messaging_rec *rec, void *private_data),
535         void *private_data)
536 {
537         struct tevent_req *req;
538         struct messaging_filtered_read_state *state;
539         size_t new_waiters_len;
540
541         req = tevent_req_create(mem_ctx, &state,
542                                 struct messaging_filtered_read_state);
543         if (req == NULL) {
544                 return NULL;
545         }
546         state->ev = ev;
547         state->msg_ctx = msg_ctx;
548         state->filter = filter;
549         state->private_data = private_data;
550
551         /*
552          * We have to defer the callback here, as we might be called from
553          * within a different tevent_context than state->ev
554          */
555         tevent_req_defer_callback(req, state->ev);
556
557         state->tevent_handle = messaging_dgm_register_tevent_context(
558                 state, ev);
559         if (tevent_req_nomem(state->tevent_handle, req)) {
560                 return tevent_req_post(req, ev);
561         }
562
563         /*
564          * We add ourselves to the "new_waiters" array, not the "waiters"
565          * array. If we are called from within messaging_read_done,
566          * messaging_dispatch_rec will be in an active for-loop on
567          * "waiters". We must be careful not to mess with this array, because
568          * it could mean that a single event is being delivered twice.
569          */
570
571         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
572
573         if (new_waiters_len == msg_ctx->num_new_waiters) {
574                 struct tevent_req **tmp;
575
576                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
577                                      struct tevent_req *, new_waiters_len+1);
578                 if (tevent_req_nomem(tmp, req)) {
579                         return tevent_req_post(req, ev);
580                 }
581                 msg_ctx->new_waiters = tmp;
582         }
583
584         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
585         msg_ctx->num_new_waiters += 1;
586         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
587
588         return req;
589 }
590
591 static void messaging_filtered_read_cleanup(struct tevent_req *req,
592                                             enum tevent_req_state req_state)
593 {
594         struct messaging_filtered_read_state *state = tevent_req_data(
595                 req, struct messaging_filtered_read_state);
596         struct messaging_context *msg_ctx = state->msg_ctx;
597         unsigned i;
598
599         tevent_req_set_cleanup_fn(req, NULL);
600
601         TALLOC_FREE(state->tevent_handle);
602
603         /*
604          * Just set the [new_]waiters entry to NULL, be careful not to mess
605          * with the other "waiters" array contents. We are often called from
606          * within "messaging_dispatch_rec", which loops over
607          * "waiters". Messing with the "waiters" array will mess up that
608          * for-loop.
609          */
610
611         for (i=0; i<msg_ctx->num_waiters; i++) {
612                 if (msg_ctx->waiters[i] == req) {
613                         msg_ctx->waiters[i] = NULL;
614                         return;
615                 }
616         }
617
618         for (i=0; i<msg_ctx->num_new_waiters; i++) {
619                 if (msg_ctx->new_waiters[i] == req) {
620                         msg_ctx->new_waiters[i] = NULL;
621                         return;
622                 }
623         }
624 }
625
626 static void messaging_filtered_read_done(struct tevent_req *req,
627                                          struct messaging_rec *rec)
628 {
629         struct messaging_filtered_read_state *state = tevent_req_data(
630                 req, struct messaging_filtered_read_state);
631
632         state->rec = messaging_rec_dup(state, rec);
633         if (tevent_req_nomem(state->rec, req)) {
634                 return;
635         }
636         tevent_req_done(req);
637 }
638
639 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
640                                  struct messaging_rec **presult)
641 {
642         struct messaging_filtered_read_state *state = tevent_req_data(
643                 req, struct messaging_filtered_read_state);
644         int err;
645
646         if (tevent_req_is_unix_error(req, &err)) {
647                 tevent_req_received(req);
648                 return err;
649         }
650         *presult = talloc_move(mem_ctx, &state->rec);
651         return 0;
652 }
653
654 struct messaging_read_state {
655         uint32_t msg_type;
656         struct messaging_rec *rec;
657 };
658
659 static bool messaging_read_filter(struct messaging_rec *rec,
660                                   void *private_data);
661 static void messaging_read_done(struct tevent_req *subreq);
662
663 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
664                                        struct tevent_context *ev,
665                                        struct messaging_context *msg,
666                                        uint32_t msg_type)
667 {
668         struct tevent_req *req, *subreq;
669         struct messaging_read_state *state;
670
671         req = tevent_req_create(mem_ctx, &state,
672                                 struct messaging_read_state);
673         if (req == NULL) {
674                 return NULL;
675         }
676         state->msg_type = msg_type;
677
678         subreq = messaging_filtered_read_send(state, ev, msg,
679                                               messaging_read_filter, state);
680         if (tevent_req_nomem(subreq, req)) {
681                 return tevent_req_post(req, ev);
682         }
683         tevent_req_set_callback(subreq, messaging_read_done, req);
684         return req;
685 }
686
687 static bool messaging_read_filter(struct messaging_rec *rec,
688                                   void *private_data)
689 {
690         struct messaging_read_state *state = talloc_get_type_abort(
691                 private_data, struct messaging_read_state);
692
693         if (rec->num_fds != 0) {
694                 return false;
695         }
696
697         return rec->msg_type == state->msg_type;
698 }
699
700 static void messaging_read_done(struct tevent_req *subreq)
701 {
702         struct tevent_req *req = tevent_req_callback_data(
703                 subreq, struct tevent_req);
704         struct messaging_read_state *state = tevent_req_data(
705                 req, struct messaging_read_state);
706         int ret;
707
708         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
709         TALLOC_FREE(subreq);
710         if (tevent_req_error(req, ret)) {
711                 return;
712         }
713         tevent_req_done(req);
714 }
715
716 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
717                         struct messaging_rec **presult)
718 {
719         struct messaging_read_state *state = tevent_req_data(
720                 req, struct messaging_read_state);
721         int err;
722
723         if (tevent_req_is_unix_error(req, &err)) {
724                 return err;
725         }
726         if (presult != NULL) {
727                 *presult = talloc_move(mem_ctx, &state->rec);
728         }
729         return 0;
730 }
731
732 struct messaging_handler_state {
733         struct tevent_context *ev;
734         struct messaging_context *msg_ctx;
735         uint32_t msg_type;
736         bool (*handler)(struct messaging_context *msg_ctx,
737                         struct messaging_rec **rec, void *private_data);
738         void *private_data;
739 };
740
741 static void messaging_handler_got_msg(struct tevent_req *subreq);
742
743 struct tevent_req *messaging_handler_send(
744         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
745         struct messaging_context *msg_ctx, uint32_t msg_type,
746         bool (*handler)(struct messaging_context *msg_ctx,
747                         struct messaging_rec **rec, void *private_data),
748         void *private_data)
749 {
750         struct tevent_req *req, *subreq;
751         struct messaging_handler_state *state;
752
753         req = tevent_req_create(mem_ctx, &state,
754                                 struct messaging_handler_state);
755         if (req == NULL) {
756                 return NULL;
757         }
758         state->ev = ev;
759         state->msg_ctx = msg_ctx;
760         state->msg_type = msg_type;
761         state->handler = handler;
762         state->private_data = private_data;
763
764         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
765                                      state->msg_type);
766         if (tevent_req_nomem(subreq, req)) {
767                 return tevent_req_post(req, ev);
768         }
769         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
770         return req;
771 }
772
773 static void messaging_handler_got_msg(struct tevent_req *subreq)
774 {
775         struct tevent_req *req = tevent_req_callback_data(
776                 subreq, struct tevent_req);
777         struct messaging_handler_state *state = tevent_req_data(
778                 req, struct messaging_handler_state);
779         struct messaging_rec *rec;
780         int ret;
781         bool ok;
782
783         ret = messaging_read_recv(subreq, state, &rec);
784         TALLOC_FREE(subreq);
785         if (tevent_req_error(req, ret)) {
786                 return;
787         }
788
789         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
790                                      state->msg_type);
791         if (tevent_req_nomem(subreq, req)) {
792                 return;
793         }
794         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
795
796         ok = state->handler(state->msg_ctx, &rec, state->private_data);
797         TALLOC_FREE(rec);
798         if (ok) {
799                 /*
800                  * Next round
801                  */
802                 return;
803         }
804         TALLOC_FREE(subreq);
805         tevent_req_done(req);
806 }
807
808 int messaging_handler_recv(struct tevent_req *req)
809 {
810         return tevent_req_simple_recv_unix(req);
811 }
812
813 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
814 {
815         if (msg_ctx->num_new_waiters == 0) {
816                 return true;
817         }
818
819         if (talloc_array_length(msg_ctx->waiters) <
820             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
821                 struct tevent_req **tmp;
822                 tmp = talloc_realloc(
823                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
824                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
825                 if (tmp == NULL) {
826                         DEBUG(1, ("%s: talloc failed\n", __func__));
827                         return false;
828                 }
829                 msg_ctx->waiters = tmp;
830         }
831
832         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
833                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
834
835         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
836         msg_ctx->num_new_waiters = 0;
837
838         return true;
839 }
840
841 /*
842   Dispatch one messaging_rec
843 */
844 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
845                                    struct messaging_rec *rec)
846 {
847         struct messaging_callback *cb, *next;
848         unsigned i;
849         size_t j;
850
851         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
852                 next = cb->next;
853                 if (cb->msg_type != rec->msg_type) {
854                         continue;
855                 }
856
857                 /*
858                  * the old style callbacks don't support fd passing
859                  */
860                 for (j=0; j < rec->num_fds; j++) {
861                         int fd = rec->fds[j];
862                         close(fd);
863                 }
864                 rec->num_fds = 0;
865                 rec->fds = NULL;
866
867                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
868                        rec->src, &rec->buf);
869
870                 /*
871                  * we continue looking for matching messages after finding
872                  * one. This matters for subsystems like the internal notify
873                  * code which register more than one handler for the same
874                  * message type
875                  */
876         }
877
878         if (!messaging_append_new_waiters(msg_ctx)) {
879                 for (j=0; j < rec->num_fds; j++) {
880                         int fd = rec->fds[j];
881                         close(fd);
882                 }
883                 rec->num_fds = 0;
884                 rec->fds = NULL;
885                 return;
886         }
887
888         i = 0;
889         while (i < msg_ctx->num_waiters) {
890                 struct tevent_req *req;
891                 struct messaging_filtered_read_state *state;
892
893                 req = msg_ctx->waiters[i];
894                 if (req == NULL) {
895                         /*
896                          * This got cleaned up. In the meantime,
897                          * move everything down one. We need
898                          * to keep the order of waiters, as
899                          * other code may depend on this.
900                          */
901                         if (i < msg_ctx->num_waiters - 1) {
902                                 memmove(&msg_ctx->waiters[i],
903                                         &msg_ctx->waiters[i+1],
904                                         sizeof(struct tevent_req *) *
905                                             (msg_ctx->num_waiters - i - 1));
906                         }
907                         msg_ctx->num_waiters -= 1;
908                         continue;
909                 }
910
911                 state = tevent_req_data(
912                         req, struct messaging_filtered_read_state);
913                 if (state->filter(rec, state->private_data)) {
914                         messaging_filtered_read_done(req, rec);
915
916                         /*
917                          * Only the first one gets the fd-array
918                          */
919                         rec->num_fds = 0;
920                         rec->fds = NULL;
921                 }
922
923                 i += 1;
924         }
925
926         /*
927          * If the fd-array isn't used, just close it.
928          */
929         for (j=0; j < rec->num_fds; j++) {
930                 int fd = rec->fds[j];
931                 close(fd);
932         }
933         rec->num_fds = 0;
934         rec->fds = NULL;
935 }
936
937 static int mess_parent_dgm_cleanup(void *private_data);
938 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
939
940 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
941 {
942         struct tevent_req *req;
943
944         req = background_job_send(
945                 msg, msg->event_ctx, msg, NULL, 0,
946                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
947                             60*15),
948                 mess_parent_dgm_cleanup, msg);
949         if (req == NULL) {
950                 return false;
951         }
952         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
953         return true;
954 }
955
956 static int mess_parent_dgm_cleanup(void *private_data)
957 {
958         int ret;
959
960         ret = messaging_dgm_wipe();
961         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
962                    ret ? strerror(ret) : "ok"));
963         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
964                            60*15);
965 }
966
967 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
968 {
969         struct messaging_context *msg = tevent_req_callback_data(
970                 req, struct messaging_context);
971         NTSTATUS status;
972
973         status = background_job_recv(req);
974         TALLOC_FREE(req);
975         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
976                   nt_errstr(status)));
977
978         req = background_job_send(
979                 msg, msg->event_ctx, msg, NULL, 0,
980                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
981                             60*15),
982                 mess_parent_dgm_cleanup, msg);
983         if (req == NULL) {
984                 DEBUG(1, ("background_job_send failed\n"));
985                 return;
986         }
987         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
988 }
989
990 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
991 {
992         int ret;
993
994         if (pid == 0) {
995                 ret = messaging_dgm_wipe();
996         } else {
997                 ret = messaging_dgm_cleanup(pid);
998         }
999
1000         return ret;
1001 }
1002
1003 struct tevent_context *messaging_tevent_context(
1004         struct messaging_context *msg_ctx)
1005 {
1006         return msg_ctx->event_ctx;
1007 }
1008
1009 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1010 {
1011         return msg_ctx->names_db;
1012 }
1013
1014 /** @} **/