lib: Add messaging_rec_create
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
87                                                struct messaging_rec *rec);
88 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
89                                    struct messaging_rec *rec);
90
91 /****************************************************************************
92  A useful function for testing the message system.
93 ****************************************************************************/
94
95 static void ping_message(struct messaging_context *msg_ctx,
96                          void *private_data,
97                          uint32_t msg_type,
98                          struct server_id src,
99                          DATA_BLOB *data)
100 {
101         struct server_id_buf idbuf;
102
103         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
104                   server_id_str_buf(src, &idbuf), (int)data->length,
105                   data->data ? (char *)data->data : ""));
106
107         messaging_send(msg_ctx, src, MSG_PONG, data);
108 }
109
110 static struct messaging_rec *messaging_rec_create(
111         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
112         uint32_t msg_type, const struct iovec *iov, int iovlen,
113         const int *fds, size_t num_fds)
114 {
115         ssize_t buflen;
116         uint8_t *buf;
117         struct messaging_rec *result;
118
119         if (num_fds > INT8_MAX) {
120                 return NULL;
121         }
122
123         buflen = iov_buflen(iov, iovlen);
124         if (buflen == -1) {
125                 return NULL;
126         }
127         buf = talloc_array(mem_ctx, uint8_t, buflen);
128         if (buf == NULL) {
129                 return NULL;
130         }
131         iov_buf(iov, iovlen, buf, buflen);
132
133         {
134                 struct messaging_rec rec;
135                 int64_t fds64[num_fds];
136                 size_t i;
137
138                 for (i=0; i<num_fds; i++) {
139                         fds64[i] = fds[i];
140                 }
141
142                 rec = (struct messaging_rec) {
143                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
144                         .src = src, .dest = dst,
145                         .buf.data = buf, .buf.length = buflen,
146                         .num_fds = num_fds, .fds = fds64,
147                 };
148
149                 result = messaging_rec_dup(mem_ctx, &rec);
150         }
151
152         TALLOC_FREE(buf);
153
154         return result;
155 }
156
157 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
158                               int *fds, size_t num_fds,
159                               void *private_data)
160 {
161         struct messaging_context *msg_ctx = talloc_get_type_abort(
162                 private_data, struct messaging_context);
163         struct server_id_buf idbuf;
164         struct messaging_rec rec;
165         int64_t fds64[MIN(num_fds, INT8_MAX)];
166         size_t i;
167
168         if (msg_len < MESSAGE_HDR_LENGTH) {
169                 DBG_WARNING("message too short: %zu\n", msg_len);
170                 goto close_fail;
171         }
172
173         if (num_fds > INT8_MAX) {
174                 DBG_WARNING("too many fds: %zu\n", num_fds);
175                 goto close_fail;
176         }
177
178         /*
179          * "consume" the fds by copying them and setting
180          * the original variable to -1
181          */
182         for (i=0; i < num_fds; i++) {
183                 fds64[i] = fds[i];
184                 fds[i] = -1;
185         }
186
187         rec = (struct messaging_rec) {
188                 .msg_version = MESSAGE_VERSION,
189                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
190                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
191                 .num_fds = num_fds,
192                 .fds = fds64,
193         };
194
195         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
196
197         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
198                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
199                   server_id_str_buf(rec.src, &idbuf));
200
201         messaging_dispatch_rec(msg_ctx, &rec);
202         return;
203
204 close_fail:
205         for (i=0; i < num_fds; i++) {
206                 close(fds[i]);
207         }
208 }
209
210 static int messaging_context_destructor(struct messaging_context *ctx)
211 {
212         unsigned i;
213
214         for (i=0; i<ctx->num_new_waiters; i++) {
215                 if (ctx->new_waiters[i] != NULL) {
216                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
217                         ctx->new_waiters[i] = NULL;
218                 }
219         }
220         for (i=0; i<ctx->num_waiters; i++) {
221                 if (ctx->waiters[i] != NULL) {
222                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
223                         ctx->waiters[i] = NULL;
224                 }
225         }
226
227         return 0;
228 }
229
230 static const char *private_path(const char *name)
231 {
232         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
233 }
234
235 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
236                                          struct tevent_context *ev)
237 {
238         struct messaging_context *ctx;
239         int ret;
240         const char *lck_path;
241         const char *priv_path;
242         bool ok;
243
244         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
245                 return NULL;
246         }
247
248         ctx->id = (struct server_id) {
249                 .pid = getpid(), .vnn = NONCLUSTER_VNN
250         };
251
252         ctx->event_ctx = ev;
253
254         sec_init();
255
256         lck_path = lock_path("msg.lock");
257         if (lck_path == NULL) {
258                 TALLOC_FREE(ctx);
259                 return NULL;
260         }
261
262         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
263                                               0755);
264         if (!ok) {
265                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
266                            __func__, strerror(errno)));
267                 TALLOC_FREE(ctx);
268                 return NULL;
269         }
270
271         priv_path = private_path("msg.sock");
272         if (priv_path == NULL) {
273                 TALLOC_FREE(ctx);
274                 return NULL;
275         }
276
277         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
278                                               0700);
279         if (!ok) {
280                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
281                            __func__, strerror(errno)));
282                 TALLOC_FREE(ctx);
283                 return NULL;
284         }
285
286         ctx->msg_dgm_ref = messaging_dgm_ref(
287                 ctx, ctx->event_ctx, &ctx->id.unique_id,
288                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
289
290         if (ctx->msg_dgm_ref == NULL) {
291                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
292                 TALLOC_FREE(ctx);
293                 return NULL;
294         }
295
296         talloc_set_destructor(ctx, messaging_context_destructor);
297
298         if (lp_clustering()) {
299                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
300
301                 if (ret != 0) {
302                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
303                                   strerror(ret)));
304                         TALLOC_FREE(ctx);
305                         return NULL;
306                 }
307         }
308         ctx->id.vnn = get_my_vnn();
309
310         ctx->names_db = server_id_db_init(
311                 ctx, ctx->id, lp_lock_directory(), 0,
312                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
313         if (ctx->names_db == NULL) {
314                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
315                 TALLOC_FREE(ctx);
316                 return NULL;
317         }
318
319         messaging_register(ctx, NULL, MSG_PING, ping_message);
320
321         /* Register some debugging related messages */
322
323         register_msg_pool_usage(ctx);
324         register_dmalloc_msgs(ctx);
325         debug_register_msgs(ctx);
326
327         {
328                 struct server_id_buf tmp;
329                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
330         }
331
332         return ctx;
333 }
334
335 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
336 {
337         return msg_ctx->id;
338 }
339
340 /*
341  * re-init after a fork
342  */
343 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
344 {
345         int ret;
346         char *lck_path;
347
348         TALLOC_FREE(msg_ctx->msg_dgm_ref);
349
350         msg_ctx->id = (struct server_id) {
351                 .pid = getpid(), .vnn = msg_ctx->id.vnn
352         };
353
354         lck_path = lock_path("msg.lock");
355         if (lck_path == NULL) {
356                 return NT_STATUS_NO_MEMORY;
357         }
358
359         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
360                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
361                 private_path("msg.sock"), lck_path,
362                 messaging_recv_cb, msg_ctx, &ret);
363
364         if (msg_ctx->msg_dgm_ref == NULL) {
365                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
366                 return map_nt_error_from_unix(ret);
367         }
368
369         if (lp_clustering()) {
370                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
371                                              msg_ctx->remote);
372
373                 if (ret != 0) {
374                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
375                                   strerror(ret)));
376                         return map_nt_error_from_unix(ret);
377                 }
378         }
379
380         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
381
382         return NT_STATUS_OK;
383 }
384
385
386 /*
387  * Register a dispatch function for a particular message type. Allow multiple
388  * registrants
389 */
390 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
391                             void *private_data,
392                             uint32_t msg_type,
393                             void (*fn)(struct messaging_context *msg,
394                                        void *private_data, 
395                                        uint32_t msg_type, 
396                                        struct server_id server_id,
397                                        DATA_BLOB *data))
398 {
399         struct messaging_callback *cb;
400
401         DEBUG(5, ("Registering messaging pointer for type %u - "
402                   "private_data=%p\n",
403                   (unsigned)msg_type, private_data));
404
405         /*
406          * Only one callback per type
407          */
408
409         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
410                 /* we allow a second registration of the same message
411                    type if it has a different private pointer. This is
412                    needed in, for example, the internal notify code,
413                    which creates a new notify context for each tree
414                    connect, and expects to receive messages to each of
415                    them. */
416                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
417                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
418                                   (unsigned)msg_type, private_data));
419                         cb->fn = fn;
420                         cb->private_data = private_data;
421                         return NT_STATUS_OK;
422                 }
423         }
424
425         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
426                 return NT_STATUS_NO_MEMORY;
427         }
428
429         cb->msg_type = msg_type;
430         cb->fn = fn;
431         cb->private_data = private_data;
432
433         DLIST_ADD(msg_ctx->callbacks, cb);
434         return NT_STATUS_OK;
435 }
436
437 /*
438   De-register the function for a particular message type.
439 */
440 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
441                           void *private_data)
442 {
443         struct messaging_callback *cb, *next;
444
445         for (cb = ctx->callbacks; cb; cb = next) {
446                 next = cb->next;
447                 if ((cb->msg_type == msg_type)
448                     && (cb->private_data == private_data)) {
449                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
450                                   (unsigned)msg_type, private_data));
451                         DLIST_REMOVE(ctx->callbacks, cb);
452                         TALLOC_FREE(cb);
453                 }
454         }
455 }
456
457 /*
458   Send a message to a particular server
459 */
460 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
461                         struct server_id server, uint32_t msg_type,
462                         const DATA_BLOB *data)
463 {
464         struct iovec iov = {0};
465
466         if (data != NULL) {
467                 iov.iov_base = data->data;
468                 iov.iov_len = data->length;
469         };
470
471         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
472 }
473
474 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
475                             struct server_id server, uint32_t msg_type,
476                             const uint8_t *buf, size_t len)
477 {
478         DATA_BLOB blob = data_blob_const(buf, len);
479         return messaging_send(msg_ctx, server, msg_type, &blob);
480 }
481
482 int messaging_send_iov_from(struct messaging_context *msg_ctx,
483                             struct server_id src, struct server_id dst,
484                             uint32_t msg_type,
485                             const struct iovec *iov, int iovlen,
486                             const int *fds, size_t num_fds)
487 {
488         int ret;
489         uint8_t hdr[MESSAGE_HDR_LENGTH];
490         struct iovec iov2[iovlen+1];
491
492         if (server_id_is_disconnected(&dst)) {
493                 return EINVAL;
494         }
495
496         if (num_fds > INT8_MAX) {
497                 return EINVAL;
498         }
499
500         if (dst.vnn != msg_ctx->id.vnn) {
501                 if (num_fds > 0) {
502                         return ENOSYS;
503                 }
504
505                 ret = msg_ctx->remote->send_fn(src, dst,
506                                                msg_type, iov, iovlen,
507                                                NULL, 0,
508                                                msg_ctx->remote);
509                 return ret;
510         }
511
512         message_hdr_put(hdr, msg_type, src, dst);
513         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
514         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
515
516         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
517
518         if (ret == EACCES) {
519                 become_root();
520                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
521                                          fds, num_fds);
522                 unbecome_root();
523         }
524
525         return ret;
526 }
527
528 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
529                             struct server_id dst, uint32_t msg_type,
530                             const struct iovec *iov, int iovlen,
531                             const int *fds, size_t num_fds)
532 {
533         int ret;
534
535         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
536                                       iov, iovlen, fds, num_fds);
537         if (ret != 0) {
538                 return map_nt_error_from_unix(ret);
539         }
540         return NT_STATUS_OK;
541 }
542
543 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
544                                                struct messaging_rec *rec)
545 {
546         struct messaging_rec *result;
547         size_t fds_size = sizeof(int64_t) * rec->num_fds;
548         size_t payload_len;
549
550         payload_len = rec->buf.length + fds_size;
551         if (payload_len < rec->buf.length) {
552                 /* overflow */
553                 return NULL;
554         }
555
556         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
557                                       payload_len);
558         if (result == NULL) {
559                 return NULL;
560         }
561         *result = *rec;
562
563         /* Doesn't fail, see talloc_pooled_object */
564
565         result->buf.data = talloc_memdup(result, rec->buf.data,
566                                          rec->buf.length);
567
568         result->fds = NULL;
569         if (result->num_fds > 0) {
570                 result->fds = talloc_memdup(result, rec->fds, fds_size);
571         }
572
573         return result;
574 }
575
576 struct messaging_filtered_read_state {
577         struct tevent_context *ev;
578         struct messaging_context *msg_ctx;
579         void *tevent_handle;
580
581         bool (*filter)(struct messaging_rec *rec, void *private_data);
582         void *private_data;
583
584         struct messaging_rec *rec;
585 };
586
587 static void messaging_filtered_read_cleanup(struct tevent_req *req,
588                                             enum tevent_req_state req_state);
589
590 struct tevent_req *messaging_filtered_read_send(
591         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
592         struct messaging_context *msg_ctx,
593         bool (*filter)(struct messaging_rec *rec, void *private_data),
594         void *private_data)
595 {
596         struct tevent_req *req;
597         struct messaging_filtered_read_state *state;
598         size_t new_waiters_len;
599
600         req = tevent_req_create(mem_ctx, &state,
601                                 struct messaging_filtered_read_state);
602         if (req == NULL) {
603                 return NULL;
604         }
605         state->ev = ev;
606         state->msg_ctx = msg_ctx;
607         state->filter = filter;
608         state->private_data = private_data;
609
610         /*
611          * We have to defer the callback here, as we might be called from
612          * within a different tevent_context than state->ev
613          */
614         tevent_req_defer_callback(req, state->ev);
615
616         state->tevent_handle = messaging_dgm_register_tevent_context(
617                 state, ev);
618         if (tevent_req_nomem(state->tevent_handle, req)) {
619                 return tevent_req_post(req, ev);
620         }
621
622         /*
623          * We add ourselves to the "new_waiters" array, not the "waiters"
624          * array. If we are called from within messaging_read_done,
625          * messaging_dispatch_rec will be in an active for-loop on
626          * "waiters". We must be careful not to mess with this array, because
627          * it could mean that a single event is being delivered twice.
628          */
629
630         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
631
632         if (new_waiters_len == msg_ctx->num_new_waiters) {
633                 struct tevent_req **tmp;
634
635                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
636                                      struct tevent_req *, new_waiters_len+1);
637                 if (tevent_req_nomem(tmp, req)) {
638                         return tevent_req_post(req, ev);
639                 }
640                 msg_ctx->new_waiters = tmp;
641         }
642
643         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
644         msg_ctx->num_new_waiters += 1;
645         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
646
647         return req;
648 }
649
650 static void messaging_filtered_read_cleanup(struct tevent_req *req,
651                                             enum tevent_req_state req_state)
652 {
653         struct messaging_filtered_read_state *state = tevent_req_data(
654                 req, struct messaging_filtered_read_state);
655         struct messaging_context *msg_ctx = state->msg_ctx;
656         unsigned i;
657
658         tevent_req_set_cleanup_fn(req, NULL);
659
660         TALLOC_FREE(state->tevent_handle);
661
662         /*
663          * Just set the [new_]waiters entry to NULL, be careful not to mess
664          * with the other "waiters" array contents. We are often called from
665          * within "messaging_dispatch_rec", which loops over
666          * "waiters". Messing with the "waiters" array will mess up that
667          * for-loop.
668          */
669
670         for (i=0; i<msg_ctx->num_waiters; i++) {
671                 if (msg_ctx->waiters[i] == req) {
672                         msg_ctx->waiters[i] = NULL;
673                         return;
674                 }
675         }
676
677         for (i=0; i<msg_ctx->num_new_waiters; i++) {
678                 if (msg_ctx->new_waiters[i] == req) {
679                         msg_ctx->new_waiters[i] = NULL;
680                         return;
681                 }
682         }
683 }
684
685 static void messaging_filtered_read_done(struct tevent_req *req,
686                                          struct messaging_rec *rec)
687 {
688         struct messaging_filtered_read_state *state = tevent_req_data(
689                 req, struct messaging_filtered_read_state);
690
691         state->rec = messaging_rec_dup(state, rec);
692         if (tevent_req_nomem(state->rec, req)) {
693                 return;
694         }
695         tevent_req_done(req);
696 }
697
698 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
699                                  struct messaging_rec **presult)
700 {
701         struct messaging_filtered_read_state *state = tevent_req_data(
702                 req, struct messaging_filtered_read_state);
703         int err;
704
705         if (tevent_req_is_unix_error(req, &err)) {
706                 tevent_req_received(req);
707                 return err;
708         }
709         if (presult != NULL) {
710                 *presult = talloc_move(mem_ctx, &state->rec);
711         }
712         return 0;
713 }
714
715 struct messaging_read_state {
716         uint32_t msg_type;
717         struct messaging_rec *rec;
718 };
719
720 static bool messaging_read_filter(struct messaging_rec *rec,
721                                   void *private_data);
722 static void messaging_read_done(struct tevent_req *subreq);
723
724 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
725                                        struct tevent_context *ev,
726                                        struct messaging_context *msg,
727                                        uint32_t msg_type)
728 {
729         struct tevent_req *req, *subreq;
730         struct messaging_read_state *state;
731
732         req = tevent_req_create(mem_ctx, &state,
733                                 struct messaging_read_state);
734         if (req == NULL) {
735                 return NULL;
736         }
737         state->msg_type = msg_type;
738
739         subreq = messaging_filtered_read_send(state, ev, msg,
740                                               messaging_read_filter, state);
741         if (tevent_req_nomem(subreq, req)) {
742                 return tevent_req_post(req, ev);
743         }
744         tevent_req_set_callback(subreq, messaging_read_done, req);
745         return req;
746 }
747
748 static bool messaging_read_filter(struct messaging_rec *rec,
749                                   void *private_data)
750 {
751         struct messaging_read_state *state = talloc_get_type_abort(
752                 private_data, struct messaging_read_state);
753
754         if (rec->num_fds != 0) {
755                 return false;
756         }
757
758         return rec->msg_type == state->msg_type;
759 }
760
761 static void messaging_read_done(struct tevent_req *subreq)
762 {
763         struct tevent_req *req = tevent_req_callback_data(
764                 subreq, struct tevent_req);
765         struct messaging_read_state *state = tevent_req_data(
766                 req, struct messaging_read_state);
767         int ret;
768
769         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
770         TALLOC_FREE(subreq);
771         if (tevent_req_error(req, ret)) {
772                 return;
773         }
774         tevent_req_done(req);
775 }
776
777 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
778                         struct messaging_rec **presult)
779 {
780         struct messaging_read_state *state = tevent_req_data(
781                 req, struct messaging_read_state);
782         int err;
783
784         if (tevent_req_is_unix_error(req, &err)) {
785                 return err;
786         }
787         if (presult != NULL) {
788                 *presult = talloc_move(mem_ctx, &state->rec);
789         }
790         return 0;
791 }
792
793 struct messaging_handler_state {
794         struct tevent_context *ev;
795         struct messaging_context *msg_ctx;
796         uint32_t msg_type;
797         bool (*handler)(struct messaging_context *msg_ctx,
798                         struct messaging_rec **rec, void *private_data);
799         void *private_data;
800 };
801
802 static void messaging_handler_got_msg(struct tevent_req *subreq);
803
804 struct tevent_req *messaging_handler_send(
805         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
806         struct messaging_context *msg_ctx, uint32_t msg_type,
807         bool (*handler)(struct messaging_context *msg_ctx,
808                         struct messaging_rec **rec, void *private_data),
809         void *private_data)
810 {
811         struct tevent_req *req, *subreq;
812         struct messaging_handler_state *state;
813
814         req = tevent_req_create(mem_ctx, &state,
815                                 struct messaging_handler_state);
816         if (req == NULL) {
817                 return NULL;
818         }
819         state->ev = ev;
820         state->msg_ctx = msg_ctx;
821         state->msg_type = msg_type;
822         state->handler = handler;
823         state->private_data = private_data;
824
825         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
826                                      state->msg_type);
827         if (tevent_req_nomem(subreq, req)) {
828                 return tevent_req_post(req, ev);
829         }
830         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
831         return req;
832 }
833
834 static void messaging_handler_got_msg(struct tevent_req *subreq)
835 {
836         struct tevent_req *req = tevent_req_callback_data(
837                 subreq, struct tevent_req);
838         struct messaging_handler_state *state = tevent_req_data(
839                 req, struct messaging_handler_state);
840         struct messaging_rec *rec;
841         int ret;
842         bool ok;
843
844         ret = messaging_read_recv(subreq, state, &rec);
845         TALLOC_FREE(subreq);
846         if (tevent_req_error(req, ret)) {
847                 return;
848         }
849
850         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
851                                      state->msg_type);
852         if (tevent_req_nomem(subreq, req)) {
853                 return;
854         }
855         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
856
857         ok = state->handler(state->msg_ctx, &rec, state->private_data);
858         TALLOC_FREE(rec);
859         if (ok) {
860                 /*
861                  * Next round
862                  */
863                 return;
864         }
865         TALLOC_FREE(subreq);
866         tevent_req_done(req);
867 }
868
869 int messaging_handler_recv(struct tevent_req *req)
870 {
871         return tevent_req_simple_recv_unix(req);
872 }
873
874 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
875 {
876         if (msg_ctx->num_new_waiters == 0) {
877                 return true;
878         }
879
880         if (talloc_array_length(msg_ctx->waiters) <
881             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
882                 struct tevent_req **tmp;
883                 tmp = talloc_realloc(
884                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
885                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
886                 if (tmp == NULL) {
887                         DEBUG(1, ("%s: talloc failed\n", __func__));
888                         return false;
889                 }
890                 msg_ctx->waiters = tmp;
891         }
892
893         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
894                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
895
896         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
897         msg_ctx->num_new_waiters = 0;
898
899         return true;
900 }
901
902 /*
903   Dispatch one messaging_rec
904 */
905 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
906                                    struct messaging_rec *rec)
907 {
908         struct messaging_callback *cb, *next;
909         unsigned i;
910         size_t j;
911
912         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
913                 next = cb->next;
914                 if (cb->msg_type != rec->msg_type) {
915                         continue;
916                 }
917
918                 /*
919                  * the old style callbacks don't support fd passing
920                  */
921                 for (j=0; j < rec->num_fds; j++) {
922                         int fd = rec->fds[j];
923                         close(fd);
924                 }
925                 rec->num_fds = 0;
926                 rec->fds = NULL;
927
928                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
929                        rec->src, &rec->buf);
930
931                 /*
932                  * we continue looking for matching messages after finding
933                  * one. This matters for subsystems like the internal notify
934                  * code which register more than one handler for the same
935                  * message type
936                  */
937         }
938
939         if (!messaging_append_new_waiters(msg_ctx)) {
940                 for (j=0; j < rec->num_fds; j++) {
941                         int fd = rec->fds[j];
942                         close(fd);
943                 }
944                 rec->num_fds = 0;
945                 rec->fds = NULL;
946                 return;
947         }
948
949         i = 0;
950         while (i < msg_ctx->num_waiters) {
951                 struct tevent_req *req;
952                 struct messaging_filtered_read_state *state;
953
954                 req = msg_ctx->waiters[i];
955                 if (req == NULL) {
956                         /*
957                          * This got cleaned up. In the meantime,
958                          * move everything down one. We need
959                          * to keep the order of waiters, as
960                          * other code may depend on this.
961                          */
962                         if (i < msg_ctx->num_waiters - 1) {
963                                 memmove(&msg_ctx->waiters[i],
964                                         &msg_ctx->waiters[i+1],
965                                         sizeof(struct tevent_req *) *
966                                             (msg_ctx->num_waiters - i - 1));
967                         }
968                         msg_ctx->num_waiters -= 1;
969                         continue;
970                 }
971
972                 state = tevent_req_data(
973                         req, struct messaging_filtered_read_state);
974                 if (state->filter(rec, state->private_data)) {
975                         messaging_filtered_read_done(req, rec);
976
977                         /*
978                          * Only the first one gets the fd-array
979                          */
980                         rec->num_fds = 0;
981                         rec->fds = NULL;
982                 }
983
984                 i += 1;
985         }
986
987         /*
988          * If the fd-array isn't used, just close it.
989          */
990         for (j=0; j < rec->num_fds; j++) {
991                 int fd = rec->fds[j];
992                 close(fd);
993         }
994         rec->num_fds = 0;
995         rec->fds = NULL;
996 }
997
998 static int mess_parent_dgm_cleanup(void *private_data);
999 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1000
1001 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1002 {
1003         struct tevent_req *req;
1004
1005         req = background_job_send(
1006                 msg, msg->event_ctx, msg, NULL, 0,
1007                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1008                             60*15),
1009                 mess_parent_dgm_cleanup, msg);
1010         if (req == NULL) {
1011                 return false;
1012         }
1013         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1014         return true;
1015 }
1016
1017 static int mess_parent_dgm_cleanup(void *private_data)
1018 {
1019         int ret;
1020
1021         ret = messaging_dgm_wipe();
1022         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1023                    ret ? strerror(ret) : "ok"));
1024         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1025                            60*15);
1026 }
1027
1028 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1029 {
1030         struct messaging_context *msg = tevent_req_callback_data(
1031                 req, struct messaging_context);
1032         NTSTATUS status;
1033
1034         status = background_job_recv(req);
1035         TALLOC_FREE(req);
1036         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1037                   nt_errstr(status)));
1038
1039         req = background_job_send(
1040                 msg, msg->event_ctx, msg, NULL, 0,
1041                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1042                             60*15),
1043                 mess_parent_dgm_cleanup, msg);
1044         if (req == NULL) {
1045                 DEBUG(1, ("background_job_send failed\n"));
1046                 return;
1047         }
1048         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1049 }
1050
1051 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1052 {
1053         int ret;
1054
1055         if (pid == 0) {
1056                 ret = messaging_dgm_wipe();
1057         } else {
1058                 ret = messaging_dgm_cleanup(pid);
1059         }
1060
1061         return ret;
1062 }
1063
1064 struct tevent_context *messaging_tevent_context(
1065         struct messaging_context *msg_ctx)
1066 {
1067         return msg_ctx->event_ctx;
1068 }
1069
1070 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1071 {
1072         return msg_ctx->names_db;
1073 }
1074
1075 /** @} **/