lib: Avoid a few casts
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
109                               int *fds, size_t num_fds,
110                               void *private_data)
111 {
112         struct messaging_context *msg_ctx = talloc_get_type_abort(
113                 private_data, struct messaging_context);
114         struct server_id_buf idbuf;
115         struct messaging_rec rec;
116         int64_t fds64[MIN(num_fds, INT8_MAX)];
117         size_t i;
118
119         if (msg_len < MESSAGE_HDR_LENGTH) {
120                 DBG_WARNING("message too short: %zu\n", msg_len);
121                 goto close_fail;
122         }
123
124         if (num_fds > INT8_MAX) {
125                 DBG_WARNING("too many fds: %zu\n", num_fds);
126                 goto close_fail;
127         }
128
129         /*
130          * "consume" the fds by copying them and setting
131          * the original variable to -1
132          */
133         for (i=0; i < num_fds; i++) {
134                 fds64[i] = fds[i];
135                 fds[i] = -1;
136         }
137
138         rec = (struct messaging_rec) {
139                 .msg_version = MESSAGE_VERSION,
140                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
141                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
142                 .num_fds = num_fds,
143                 .fds = fds64,
144         };
145
146         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
147
148         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
149                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
150                   server_id_str_buf(rec.src, &idbuf));
151
152         messaging_dispatch_rec(msg_ctx, &rec);
153         return;
154
155 close_fail:
156         for (i=0; i < num_fds; i++) {
157                 close(fds[i]);
158         }
159 }
160
161 static int messaging_context_destructor(struct messaging_context *ctx)
162 {
163         unsigned i;
164
165         for (i=0; i<ctx->num_new_waiters; i++) {
166                 if (ctx->new_waiters[i] != NULL) {
167                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
168                         ctx->new_waiters[i] = NULL;
169                 }
170         }
171         for (i=0; i<ctx->num_waiters; i++) {
172                 if (ctx->waiters[i] != NULL) {
173                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
174                         ctx->waiters[i] = NULL;
175                 }
176         }
177
178         return 0;
179 }
180
181 static const char *private_path(const char *name)
182 {
183         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
184 }
185
186 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
187                                          struct tevent_context *ev)
188 {
189         struct messaging_context *ctx;
190         int ret;
191         const char *lck_path;
192         const char *priv_path;
193         bool ok;
194
195         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
196                 return NULL;
197         }
198
199         ctx->id = (struct server_id) {
200                 .pid = getpid(), .vnn = NONCLUSTER_VNN
201         };
202
203         ctx->event_ctx = ev;
204
205         sec_init();
206
207         lck_path = lock_path("msg.lock");
208         if (lck_path == NULL) {
209                 TALLOC_FREE(ctx);
210                 return NULL;
211         }
212
213         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
214                                               0755);
215         if (!ok) {
216                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
217                            __func__, strerror(errno)));
218                 TALLOC_FREE(ctx);
219                 return NULL;
220         }
221
222         priv_path = private_path("msg.sock");
223         if (priv_path == NULL) {
224                 TALLOC_FREE(ctx);
225                 return NULL;
226         }
227
228         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
229                                               0700);
230         if (!ok) {
231                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
232                            __func__, strerror(errno)));
233                 TALLOC_FREE(ctx);
234                 return NULL;
235         }
236
237         ctx->msg_dgm_ref = messaging_dgm_ref(
238                 ctx, ctx->event_ctx, &ctx->id.unique_id,
239                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
240
241         if (ctx->msg_dgm_ref == NULL) {
242                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
243                 TALLOC_FREE(ctx);
244                 return NULL;
245         }
246
247         talloc_set_destructor(ctx, messaging_context_destructor);
248
249         if (lp_clustering()) {
250                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
251
252                 if (ret != 0) {
253                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
254                                   strerror(ret)));
255                         TALLOC_FREE(ctx);
256                         return NULL;
257                 }
258         }
259         ctx->id.vnn = get_my_vnn();
260
261         ctx->names_db = server_id_db_init(
262                 ctx, ctx->id, lp_lock_directory(), 0,
263                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
264         if (ctx->names_db == NULL) {
265                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
266                 TALLOC_FREE(ctx);
267                 return NULL;
268         }
269
270         messaging_register(ctx, NULL, MSG_PING, ping_message);
271
272         /* Register some debugging related messages */
273
274         register_msg_pool_usage(ctx);
275         register_dmalloc_msgs(ctx);
276         debug_register_msgs(ctx);
277
278         {
279                 struct server_id_buf tmp;
280                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
281         }
282
283         return ctx;
284 }
285
286 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
287 {
288         return msg_ctx->id;
289 }
290
291 /*
292  * re-init after a fork
293  */
294 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
295 {
296         int ret;
297         char *lck_path;
298
299         TALLOC_FREE(msg_ctx->msg_dgm_ref);
300
301         msg_ctx->id = (struct server_id) {
302                 .pid = getpid(), .vnn = msg_ctx->id.vnn
303         };
304
305         lck_path = lock_path("msg.lock");
306         if (lck_path == NULL) {
307                 return NT_STATUS_NO_MEMORY;
308         }
309
310         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
311                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
312                 private_path("msg.sock"), lck_path,
313                 messaging_recv_cb, msg_ctx, &ret);
314
315         if (msg_ctx->msg_dgm_ref == NULL) {
316                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
317                 return map_nt_error_from_unix(ret);
318         }
319
320         if (lp_clustering()) {
321                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
322                                              msg_ctx->remote);
323
324                 if (ret != 0) {
325                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
326                                   strerror(ret)));
327                         return map_nt_error_from_unix(ret);
328                 }
329         }
330
331         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
332
333         return NT_STATUS_OK;
334 }
335
336
337 /*
338  * Register a dispatch function for a particular message type. Allow multiple
339  * registrants
340 */
341 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
342                             void *private_data,
343                             uint32_t msg_type,
344                             void (*fn)(struct messaging_context *msg,
345                                        void *private_data, 
346                                        uint32_t msg_type, 
347                                        struct server_id server_id,
348                                        DATA_BLOB *data))
349 {
350         struct messaging_callback *cb;
351
352         DEBUG(5, ("Registering messaging pointer for type %u - "
353                   "private_data=%p\n",
354                   (unsigned)msg_type, private_data));
355
356         /*
357          * Only one callback per type
358          */
359
360         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
361                 /* we allow a second registration of the same message
362                    type if it has a different private pointer. This is
363                    needed in, for example, the internal notify code,
364                    which creates a new notify context for each tree
365                    connect, and expects to receive messages to each of
366                    them. */
367                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
368                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
369                                   (unsigned)msg_type, private_data));
370                         cb->fn = fn;
371                         cb->private_data = private_data;
372                         return NT_STATUS_OK;
373                 }
374         }
375
376         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
377                 return NT_STATUS_NO_MEMORY;
378         }
379
380         cb->msg_type = msg_type;
381         cb->fn = fn;
382         cb->private_data = private_data;
383
384         DLIST_ADD(msg_ctx->callbacks, cb);
385         return NT_STATUS_OK;
386 }
387
388 /*
389   De-register the function for a particular message type.
390 */
391 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
392                           void *private_data)
393 {
394         struct messaging_callback *cb, *next;
395
396         for (cb = ctx->callbacks; cb; cb = next) {
397                 next = cb->next;
398                 if ((cb->msg_type == msg_type)
399                     && (cb->private_data == private_data)) {
400                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
401                                   (unsigned)msg_type, private_data));
402                         DLIST_REMOVE(ctx->callbacks, cb);
403                         TALLOC_FREE(cb);
404                 }
405         }
406 }
407
408 /*
409   Send a message to a particular server
410 */
411 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
412                         struct server_id server, uint32_t msg_type,
413                         const DATA_BLOB *data)
414 {
415         struct iovec iov = {0};
416
417         if (data != NULL) {
418                 iov.iov_base = data->data;
419                 iov.iov_len = data->length;
420         };
421
422         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
423 }
424
425 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
426                             struct server_id server, uint32_t msg_type,
427                             const uint8_t *buf, size_t len)
428 {
429         DATA_BLOB blob = data_blob_const(buf, len);
430         return messaging_send(msg_ctx, server, msg_type, &blob);
431 }
432
433 int messaging_send_iov_from(struct messaging_context *msg_ctx,
434                             struct server_id src, struct server_id dst,
435                             uint32_t msg_type,
436                             const struct iovec *iov, int iovlen,
437                             const int *fds, size_t num_fds)
438 {
439         int ret;
440         uint8_t hdr[MESSAGE_HDR_LENGTH];
441         struct iovec iov2[iovlen+1];
442
443         if (server_id_is_disconnected(&dst)) {
444                 return EINVAL;
445         }
446
447         if (num_fds > INT8_MAX) {
448                 return EINVAL;
449         }
450
451         if (dst.vnn != msg_ctx->id.vnn) {
452                 if (num_fds > 0) {
453                         return ENOSYS;
454                 }
455
456                 ret = msg_ctx->remote->send_fn(src, dst,
457                                                msg_type, iov, iovlen,
458                                                NULL, 0,
459                                                msg_ctx->remote);
460                 return ret;
461         }
462
463         message_hdr_put(hdr, msg_type, src, dst);
464         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
465         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
466
467         become_root();
468         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
469         unbecome_root();
470
471         return ret;
472 }
473
474 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
475                             struct server_id dst, uint32_t msg_type,
476                             const struct iovec *iov, int iovlen,
477                             const int *fds, size_t num_fds)
478 {
479         int ret;
480
481         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
482                                       iov, iovlen, fds, num_fds);
483         if (ret != 0) {
484                 return map_nt_error_from_unix(ret);
485         }
486         return NT_STATUS_OK;
487 }
488
489 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
490                                                struct messaging_rec *rec)
491 {
492         struct messaging_rec *result;
493         size_t fds_size = sizeof(int64_t) * rec->num_fds;
494
495         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
496                                       rec->buf.length + fds_size);
497         if (result == NULL) {
498                 return NULL;
499         }
500         *result = *rec;
501
502         /* Doesn't fail, see talloc_pooled_object */
503
504         result->buf.data = talloc_memdup(result, rec->buf.data,
505                                          rec->buf.length);
506
507         result->fds = NULL;
508         if (result->num_fds > 0) {
509                 result->fds = talloc_memdup(result, rec->fds, fds_size);
510         }
511
512         return result;
513 }
514
515 struct messaging_filtered_read_state {
516         struct tevent_context *ev;
517         struct messaging_context *msg_ctx;
518         void *tevent_handle;
519
520         bool (*filter)(struct messaging_rec *rec, void *private_data);
521         void *private_data;
522
523         struct messaging_rec *rec;
524 };
525
526 static void messaging_filtered_read_cleanup(struct tevent_req *req,
527                                             enum tevent_req_state req_state);
528
529 struct tevent_req *messaging_filtered_read_send(
530         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
531         struct messaging_context *msg_ctx,
532         bool (*filter)(struct messaging_rec *rec, void *private_data),
533         void *private_data)
534 {
535         struct tevent_req *req;
536         struct messaging_filtered_read_state *state;
537         size_t new_waiters_len;
538
539         req = tevent_req_create(mem_ctx, &state,
540                                 struct messaging_filtered_read_state);
541         if (req == NULL) {
542                 return NULL;
543         }
544         state->ev = ev;
545         state->msg_ctx = msg_ctx;
546         state->filter = filter;
547         state->private_data = private_data;
548
549         /*
550          * We have to defer the callback here, as we might be called from
551          * within a different tevent_context than state->ev
552          */
553         tevent_req_defer_callback(req, state->ev);
554
555         state->tevent_handle = messaging_dgm_register_tevent_context(
556                 state, ev);
557         if (tevent_req_nomem(state->tevent_handle, req)) {
558                 return tevent_req_post(req, ev);
559         }
560
561         /*
562          * We add ourselves to the "new_waiters" array, not the "waiters"
563          * array. If we are called from within messaging_read_done,
564          * messaging_dispatch_rec will be in an active for-loop on
565          * "waiters". We must be careful not to mess with this array, because
566          * it could mean that a single event is being delivered twice.
567          */
568
569         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
570
571         if (new_waiters_len == msg_ctx->num_new_waiters) {
572                 struct tevent_req **tmp;
573
574                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
575                                      struct tevent_req *, new_waiters_len+1);
576                 if (tevent_req_nomem(tmp, req)) {
577                         return tevent_req_post(req, ev);
578                 }
579                 msg_ctx->new_waiters = tmp;
580         }
581
582         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
583         msg_ctx->num_new_waiters += 1;
584         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
585
586         return req;
587 }
588
589 static void messaging_filtered_read_cleanup(struct tevent_req *req,
590                                             enum tevent_req_state req_state)
591 {
592         struct messaging_filtered_read_state *state = tevent_req_data(
593                 req, struct messaging_filtered_read_state);
594         struct messaging_context *msg_ctx = state->msg_ctx;
595         unsigned i;
596
597         tevent_req_set_cleanup_fn(req, NULL);
598
599         TALLOC_FREE(state->tevent_handle);
600
601         /*
602          * Just set the [new_]waiters entry to NULL, be careful not to mess
603          * with the other "waiters" array contents. We are often called from
604          * within "messaging_dispatch_rec", which loops over
605          * "waiters". Messing with the "waiters" array will mess up that
606          * for-loop.
607          */
608
609         for (i=0; i<msg_ctx->num_waiters; i++) {
610                 if (msg_ctx->waiters[i] == req) {
611                         msg_ctx->waiters[i] = NULL;
612                         return;
613                 }
614         }
615
616         for (i=0; i<msg_ctx->num_new_waiters; i++) {
617                 if (msg_ctx->new_waiters[i] == req) {
618                         msg_ctx->new_waiters[i] = NULL;
619                         return;
620                 }
621         }
622 }
623
624 static void messaging_filtered_read_done(struct tevent_req *req,
625                                          struct messaging_rec *rec)
626 {
627         struct messaging_filtered_read_state *state = tevent_req_data(
628                 req, struct messaging_filtered_read_state);
629
630         state->rec = messaging_rec_dup(state, rec);
631         if (tevent_req_nomem(state->rec, req)) {
632                 return;
633         }
634         tevent_req_done(req);
635 }
636
637 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
638                                  struct messaging_rec **presult)
639 {
640         struct messaging_filtered_read_state *state = tevent_req_data(
641                 req, struct messaging_filtered_read_state);
642         int err;
643
644         if (tevent_req_is_unix_error(req, &err)) {
645                 tevent_req_received(req);
646                 return err;
647         }
648         *presult = talloc_move(mem_ctx, &state->rec);
649         return 0;
650 }
651
652 struct messaging_read_state {
653         uint32_t msg_type;
654         struct messaging_rec *rec;
655 };
656
657 static bool messaging_read_filter(struct messaging_rec *rec,
658                                   void *private_data);
659 static void messaging_read_done(struct tevent_req *subreq);
660
661 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
662                                        struct tevent_context *ev,
663                                        struct messaging_context *msg,
664                                        uint32_t msg_type)
665 {
666         struct tevent_req *req, *subreq;
667         struct messaging_read_state *state;
668
669         req = tevent_req_create(mem_ctx, &state,
670                                 struct messaging_read_state);
671         if (req == NULL) {
672                 return NULL;
673         }
674         state->msg_type = msg_type;
675
676         subreq = messaging_filtered_read_send(state, ev, msg,
677                                               messaging_read_filter, state);
678         if (tevent_req_nomem(subreq, req)) {
679                 return tevent_req_post(req, ev);
680         }
681         tevent_req_set_callback(subreq, messaging_read_done, req);
682         return req;
683 }
684
685 static bool messaging_read_filter(struct messaging_rec *rec,
686                                   void *private_data)
687 {
688         struct messaging_read_state *state = talloc_get_type_abort(
689                 private_data, struct messaging_read_state);
690
691         if (rec->num_fds != 0) {
692                 return false;
693         }
694
695         return rec->msg_type == state->msg_type;
696 }
697
698 static void messaging_read_done(struct tevent_req *subreq)
699 {
700         struct tevent_req *req = tevent_req_callback_data(
701                 subreq, struct tevent_req);
702         struct messaging_read_state *state = tevent_req_data(
703                 req, struct messaging_read_state);
704         int ret;
705
706         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
707         TALLOC_FREE(subreq);
708         if (tevent_req_error(req, ret)) {
709                 return;
710         }
711         tevent_req_done(req);
712 }
713
714 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
715                         struct messaging_rec **presult)
716 {
717         struct messaging_read_state *state = tevent_req_data(
718                 req, struct messaging_read_state);
719         int err;
720
721         if (tevent_req_is_unix_error(req, &err)) {
722                 return err;
723         }
724         if (presult != NULL) {
725                 *presult = talloc_move(mem_ctx, &state->rec);
726         }
727         return 0;
728 }
729
730 struct messaging_handler_state {
731         struct tevent_context *ev;
732         struct messaging_context *msg_ctx;
733         uint32_t msg_type;
734         bool (*handler)(struct messaging_context *msg_ctx,
735                         struct messaging_rec **rec, void *private_data);
736         void *private_data;
737 };
738
739 static void messaging_handler_got_msg(struct tevent_req *subreq);
740
741 struct tevent_req *messaging_handler_send(
742         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
743         struct messaging_context *msg_ctx, uint32_t msg_type,
744         bool (*handler)(struct messaging_context *msg_ctx,
745                         struct messaging_rec **rec, void *private_data),
746         void *private_data)
747 {
748         struct tevent_req *req, *subreq;
749         struct messaging_handler_state *state;
750
751         req = tevent_req_create(mem_ctx, &state,
752                                 struct messaging_handler_state);
753         if (req == NULL) {
754                 return NULL;
755         }
756         state->ev = ev;
757         state->msg_ctx = msg_ctx;
758         state->msg_type = msg_type;
759         state->handler = handler;
760         state->private_data = private_data;
761
762         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
763                                      state->msg_type);
764         if (tevent_req_nomem(subreq, req)) {
765                 return tevent_req_post(req, ev);
766         }
767         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
768         return req;
769 }
770
771 static void messaging_handler_got_msg(struct tevent_req *subreq)
772 {
773         struct tevent_req *req = tevent_req_callback_data(
774                 subreq, struct tevent_req);
775         struct messaging_handler_state *state = tevent_req_data(
776                 req, struct messaging_handler_state);
777         struct messaging_rec *rec;
778         int ret;
779         bool ok;
780
781         ret = messaging_read_recv(subreq, state, &rec);
782         TALLOC_FREE(subreq);
783         if (tevent_req_error(req, ret)) {
784                 return;
785         }
786
787         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
788                                      state->msg_type);
789         if (tevent_req_nomem(subreq, req)) {
790                 return;
791         }
792         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
793
794         ok = state->handler(state->msg_ctx, &rec, state->private_data);
795         TALLOC_FREE(rec);
796         if (ok) {
797                 /*
798                  * Next round
799                  */
800                 return;
801         }
802         TALLOC_FREE(subreq);
803         tevent_req_done(req);
804 }
805
806 int messaging_handler_recv(struct tevent_req *req)
807 {
808         return tevent_req_simple_recv_unix(req);
809 }
810
811 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
812 {
813         if (msg_ctx->num_new_waiters == 0) {
814                 return true;
815         }
816
817         if (talloc_array_length(msg_ctx->waiters) <
818             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
819                 struct tevent_req **tmp;
820                 tmp = talloc_realloc(
821                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
822                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
823                 if (tmp == NULL) {
824                         DEBUG(1, ("%s: talloc failed\n", __func__));
825                         return false;
826                 }
827                 msg_ctx->waiters = tmp;
828         }
829
830         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
831                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
832
833         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
834         msg_ctx->num_new_waiters = 0;
835
836         return true;
837 }
838
839 /*
840   Dispatch one messaging_rec
841 */
842 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
843                                    struct messaging_rec *rec)
844 {
845         struct messaging_callback *cb, *next;
846         unsigned i;
847         size_t j;
848
849         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
850                 next = cb->next;
851                 if (cb->msg_type != rec->msg_type) {
852                         continue;
853                 }
854
855                 /*
856                  * the old style callbacks don't support fd passing
857                  */
858                 for (j=0; j < rec->num_fds; j++) {
859                         int fd = rec->fds[j];
860                         close(fd);
861                 }
862                 rec->num_fds = 0;
863                 rec->fds = NULL;
864
865                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
866                        rec->src, &rec->buf);
867
868                 /*
869                  * we continue looking for matching messages after finding
870                  * one. This matters for subsystems like the internal notify
871                  * code which register more than one handler for the same
872                  * message type
873                  */
874         }
875
876         if (!messaging_append_new_waiters(msg_ctx)) {
877                 for (j=0; j < rec->num_fds; j++) {
878                         int fd = rec->fds[j];
879                         close(fd);
880                 }
881                 rec->num_fds = 0;
882                 rec->fds = NULL;
883                 return;
884         }
885
886         i = 0;
887         while (i < msg_ctx->num_waiters) {
888                 struct tevent_req *req;
889                 struct messaging_filtered_read_state *state;
890
891                 req = msg_ctx->waiters[i];
892                 if (req == NULL) {
893                         /*
894                          * This got cleaned up. In the meantime,
895                          * move everything down one. We need
896                          * to keep the order of waiters, as
897                          * other code may depend on this.
898                          */
899                         if (i < msg_ctx->num_waiters - 1) {
900                                 memmove(&msg_ctx->waiters[i],
901                                         &msg_ctx->waiters[i+1],
902                                         sizeof(struct tevent_req *) *
903                                             (msg_ctx->num_waiters - i - 1));
904                         }
905                         msg_ctx->num_waiters -= 1;
906                         continue;
907                 }
908
909                 state = tevent_req_data(
910                         req, struct messaging_filtered_read_state);
911                 if (state->filter(rec, state->private_data)) {
912                         messaging_filtered_read_done(req, rec);
913
914                         /*
915                          * Only the first one gets the fd-array
916                          */
917                         rec->num_fds = 0;
918                         rec->fds = NULL;
919                 }
920
921                 i += 1;
922         }
923
924         /*
925          * If the fd-array isn't used, just close it.
926          */
927         for (j=0; j < rec->num_fds; j++) {
928                 int fd = rec->fds[j];
929                 close(fd);
930         }
931         rec->num_fds = 0;
932         rec->fds = NULL;
933 }
934
935 static int mess_parent_dgm_cleanup(void *private_data);
936 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
937
938 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
939 {
940         struct tevent_req *req;
941
942         req = background_job_send(
943                 msg, msg->event_ctx, msg, NULL, 0,
944                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
945                             60*15),
946                 mess_parent_dgm_cleanup, msg);
947         if (req == NULL) {
948                 return false;
949         }
950         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
951         return true;
952 }
953
954 static int mess_parent_dgm_cleanup(void *private_data)
955 {
956         int ret;
957
958         ret = messaging_dgm_wipe();
959         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
960                    ret ? strerror(ret) : "ok"));
961         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
962                            60*15);
963 }
964
965 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
966 {
967         struct messaging_context *msg = tevent_req_callback_data(
968                 req, struct messaging_context);
969         NTSTATUS status;
970
971         status = background_job_recv(req);
972         TALLOC_FREE(req);
973         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
974                   nt_errstr(status)));
975
976         req = background_job_send(
977                 msg, msg->event_ctx, msg, NULL, 0,
978                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
979                             60*15),
980                 mess_parent_dgm_cleanup, msg);
981         if (req == NULL) {
982                 DEBUG(1, ("background_job_send failed\n"));
983                 return;
984         }
985         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
986 }
987
988 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
989 {
990         int ret;
991
992         if (pid == 0) {
993                 ret = messaging_dgm_wipe();
994         } else {
995                 ret = messaging_dgm_cleanup(pid);
996         }
997
998         return ret;
999 }
1000
1001 struct tevent_context *messaging_tevent_context(
1002         struct messaging_context *msg_ctx)
1003 {
1004         return msg_ctx->event_ctx;
1005 }
1006
1007 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1008 {
1009         return msg_ctx->names_db;
1010 }
1011
1012 /** @} **/