messaging: Add wrap check to messaging_rec_dup
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "dbwrap/dbwrap.h"
50 #include "serverid.h"
51 #include "messages.h"
52 #include "lib/util/tevent_unix.h"
53 #include "lib/background.h"
54 #include "lib/messages_dgm.h"
55 #include "lib/util/iov_buf.h"
56 #include "lib/util/server_id_db.h"
57 #include "lib/messages_dgm_ref.h"
58 #include "lib/messages_util.h"
59
60 struct messaging_callback {
61         struct messaging_callback *prev, *next;
62         uint32_t msg_type;
63         void (*fn)(struct messaging_context *msg, void *private_data, 
64                    uint32_t msg_type, 
65                    struct server_id server_id, DATA_BLOB *data);
66         void *private_data;
67 };
68
69 struct messaging_context {
70         struct server_id id;
71         struct tevent_context *event_ctx;
72         struct messaging_callback *callbacks;
73
74         struct tevent_req **new_waiters;
75         unsigned num_new_waiters;
76
77         struct tevent_req **waiters;
78         unsigned num_waiters;
79
80         void *msg_dgm_ref;
81         struct messaging_backend *remote;
82
83         struct server_id_db *names_db;
84 };
85
86 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
87                                    struct messaging_rec *rec);
88
89 /****************************************************************************
90  A useful function for testing the message system.
91 ****************************************************************************/
92
93 static void ping_message(struct messaging_context *msg_ctx,
94                          void *private_data,
95                          uint32_t msg_type,
96                          struct server_id src,
97                          DATA_BLOB *data)
98 {
99         struct server_id_buf idbuf;
100
101         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
102                   server_id_str_buf(src, &idbuf), (int)data->length,
103                   data->data ? (char *)data->data : ""));
104
105         messaging_send(msg_ctx, src, MSG_PONG, data);
106 }
107
108 static void messaging_recv_cb(const uint8_t *msg, size_t msg_len,
109                               int *fds, size_t num_fds,
110                               void *private_data)
111 {
112         struct messaging_context *msg_ctx = talloc_get_type_abort(
113                 private_data, struct messaging_context);
114         struct server_id_buf idbuf;
115         struct messaging_rec rec;
116         int64_t fds64[MIN(num_fds, INT8_MAX)];
117         size_t i;
118
119         if (msg_len < MESSAGE_HDR_LENGTH) {
120                 DBG_WARNING("message too short: %zu\n", msg_len);
121                 goto close_fail;
122         }
123
124         if (num_fds > INT8_MAX) {
125                 DBG_WARNING("too many fds: %zu\n", num_fds);
126                 goto close_fail;
127         }
128
129         /*
130          * "consume" the fds by copying them and setting
131          * the original variable to -1
132          */
133         for (i=0; i < num_fds; i++) {
134                 fds64[i] = fds[i];
135                 fds[i] = -1;
136         }
137
138         rec = (struct messaging_rec) {
139                 .msg_version = MESSAGE_VERSION,
140                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
141                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
142                 .num_fds = num_fds,
143                 .fds = fds64,
144         };
145
146         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
147
148         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
149                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
150                   server_id_str_buf(rec.src, &idbuf));
151
152         messaging_dispatch_rec(msg_ctx, &rec);
153         return;
154
155 close_fail:
156         for (i=0; i < num_fds; i++) {
157                 close(fds[i]);
158         }
159 }
160
161 static int messaging_context_destructor(struct messaging_context *ctx)
162 {
163         unsigned i;
164
165         for (i=0; i<ctx->num_new_waiters; i++) {
166                 if (ctx->new_waiters[i] != NULL) {
167                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
168                         ctx->new_waiters[i] = NULL;
169                 }
170         }
171         for (i=0; i<ctx->num_waiters; i++) {
172                 if (ctx->waiters[i] != NULL) {
173                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
174                         ctx->waiters[i] = NULL;
175                 }
176         }
177
178         return 0;
179 }
180
181 static const char *private_path(const char *name)
182 {
183         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
184 }
185
186 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx, 
187                                          struct tevent_context *ev)
188 {
189         struct messaging_context *ctx;
190         int ret;
191         const char *lck_path;
192         const char *priv_path;
193         bool ok;
194
195         if (!(ctx = talloc_zero(mem_ctx, struct messaging_context))) {
196                 return NULL;
197         }
198
199         ctx->id = (struct server_id) {
200                 .pid = getpid(), .vnn = NONCLUSTER_VNN
201         };
202
203         ctx->event_ctx = ev;
204
205         sec_init();
206
207         lck_path = lock_path("msg.lock");
208         if (lck_path == NULL) {
209                 TALLOC_FREE(ctx);
210                 return NULL;
211         }
212
213         ok = directory_create_or_exist_strict(lck_path, sec_initial_uid(),
214                                               0755);
215         if (!ok) {
216                 DEBUG(10, ("%s: Could not create lock directory: %s\n",
217                            __func__, strerror(errno)));
218                 TALLOC_FREE(ctx);
219                 return NULL;
220         }
221
222         priv_path = private_path("msg.sock");
223         if (priv_path == NULL) {
224                 TALLOC_FREE(ctx);
225                 return NULL;
226         }
227
228         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
229                                               0700);
230         if (!ok) {
231                 DEBUG(10, ("%s: Could not create msg directory: %s\n",
232                            __func__, strerror(errno)));
233                 TALLOC_FREE(ctx);
234                 return NULL;
235         }
236
237         ctx->msg_dgm_ref = messaging_dgm_ref(
238                 ctx, ctx->event_ctx, &ctx->id.unique_id,
239                 priv_path, lck_path, messaging_recv_cb, ctx, &ret);
240
241         if (ctx->msg_dgm_ref == NULL) {
242                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
243                 TALLOC_FREE(ctx);
244                 return NULL;
245         }
246
247         talloc_set_destructor(ctx, messaging_context_destructor);
248
249         if (lp_clustering()) {
250                 ret = messaging_ctdbd_init(ctx, ctx, &ctx->remote);
251
252                 if (ret != 0) {
253                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
254                                   strerror(ret)));
255                         TALLOC_FREE(ctx);
256                         return NULL;
257                 }
258         }
259         ctx->id.vnn = get_my_vnn();
260
261         ctx->names_db = server_id_db_init(
262                 ctx, ctx->id, lp_lock_directory(), 0,
263                 TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
264         if (ctx->names_db == NULL) {
265                 DEBUG(10, ("%s: server_id_db_init failed\n", __func__));
266                 TALLOC_FREE(ctx);
267                 return NULL;
268         }
269
270         messaging_register(ctx, NULL, MSG_PING, ping_message);
271
272         /* Register some debugging related messages */
273
274         register_msg_pool_usage(ctx);
275         register_dmalloc_msgs(ctx);
276         debug_register_msgs(ctx);
277
278         {
279                 struct server_id_buf tmp;
280                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
281         }
282
283         return ctx;
284 }
285
286 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
287 {
288         return msg_ctx->id;
289 }
290
291 /*
292  * re-init after a fork
293  */
294 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
295 {
296         int ret;
297         char *lck_path;
298
299         TALLOC_FREE(msg_ctx->msg_dgm_ref);
300
301         msg_ctx->id = (struct server_id) {
302                 .pid = getpid(), .vnn = msg_ctx->id.vnn
303         };
304
305         lck_path = lock_path("msg.lock");
306         if (lck_path == NULL) {
307                 return NT_STATUS_NO_MEMORY;
308         }
309
310         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
311                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
312                 private_path("msg.sock"), lck_path,
313                 messaging_recv_cb, msg_ctx, &ret);
314
315         if (msg_ctx->msg_dgm_ref == NULL) {
316                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
317                 return map_nt_error_from_unix(ret);
318         }
319
320         if (lp_clustering()) {
321                 ret = messaging_ctdbd_reinit(msg_ctx, msg_ctx,
322                                              msg_ctx->remote);
323
324                 if (ret != 0) {
325                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
326                                   strerror(ret)));
327                         return map_nt_error_from_unix(ret);
328                 }
329         }
330
331         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
332
333         return NT_STATUS_OK;
334 }
335
336
337 /*
338  * Register a dispatch function for a particular message type. Allow multiple
339  * registrants
340 */
341 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
342                             void *private_data,
343                             uint32_t msg_type,
344                             void (*fn)(struct messaging_context *msg,
345                                        void *private_data, 
346                                        uint32_t msg_type, 
347                                        struct server_id server_id,
348                                        DATA_BLOB *data))
349 {
350         struct messaging_callback *cb;
351
352         DEBUG(5, ("Registering messaging pointer for type %u - "
353                   "private_data=%p\n",
354                   (unsigned)msg_type, private_data));
355
356         /*
357          * Only one callback per type
358          */
359
360         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
361                 /* we allow a second registration of the same message
362                    type if it has a different private pointer. This is
363                    needed in, for example, the internal notify code,
364                    which creates a new notify context for each tree
365                    connect, and expects to receive messages to each of
366                    them. */
367                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
368                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
369                                   (unsigned)msg_type, private_data));
370                         cb->fn = fn;
371                         cb->private_data = private_data;
372                         return NT_STATUS_OK;
373                 }
374         }
375
376         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
377                 return NT_STATUS_NO_MEMORY;
378         }
379
380         cb->msg_type = msg_type;
381         cb->fn = fn;
382         cb->private_data = private_data;
383
384         DLIST_ADD(msg_ctx->callbacks, cb);
385         return NT_STATUS_OK;
386 }
387
388 /*
389   De-register the function for a particular message type.
390 */
391 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
392                           void *private_data)
393 {
394         struct messaging_callback *cb, *next;
395
396         for (cb = ctx->callbacks; cb; cb = next) {
397                 next = cb->next;
398                 if ((cb->msg_type == msg_type)
399                     && (cb->private_data == private_data)) {
400                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
401                                   (unsigned)msg_type, private_data));
402                         DLIST_REMOVE(ctx->callbacks, cb);
403                         TALLOC_FREE(cb);
404                 }
405         }
406 }
407
408 /*
409   Send a message to a particular server
410 */
411 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
412                         struct server_id server, uint32_t msg_type,
413                         const DATA_BLOB *data)
414 {
415         struct iovec iov = {0};
416
417         if (data != NULL) {
418                 iov.iov_base = data->data;
419                 iov.iov_len = data->length;
420         };
421
422         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
423 }
424
425 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
426                             struct server_id server, uint32_t msg_type,
427                             const uint8_t *buf, size_t len)
428 {
429         DATA_BLOB blob = data_blob_const(buf, len);
430         return messaging_send(msg_ctx, server, msg_type, &blob);
431 }
432
433 int messaging_send_iov_from(struct messaging_context *msg_ctx,
434                             struct server_id src, struct server_id dst,
435                             uint32_t msg_type,
436                             const struct iovec *iov, int iovlen,
437                             const int *fds, size_t num_fds)
438 {
439         int ret;
440         uint8_t hdr[MESSAGE_HDR_LENGTH];
441         struct iovec iov2[iovlen+1];
442
443         if (server_id_is_disconnected(&dst)) {
444                 return EINVAL;
445         }
446
447         if (num_fds > INT8_MAX) {
448                 return EINVAL;
449         }
450
451         if (dst.vnn != msg_ctx->id.vnn) {
452                 if (num_fds > 0) {
453                         return ENOSYS;
454                 }
455
456                 ret = msg_ctx->remote->send_fn(src, dst,
457                                                msg_type, iov, iovlen,
458                                                NULL, 0,
459                                                msg_ctx->remote);
460                 return ret;
461         }
462
463         message_hdr_put(hdr, msg_type, src, dst);
464         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
465         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
466
467         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
468
469         if (ret == EACCES) {
470                 become_root();
471                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
472                                          fds, num_fds);
473                 unbecome_root();
474         }
475
476         return ret;
477 }
478
479 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
480                             struct server_id dst, uint32_t msg_type,
481                             const struct iovec *iov, int iovlen,
482                             const int *fds, size_t num_fds)
483 {
484         int ret;
485
486         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
487                                       iov, iovlen, fds, num_fds);
488         if (ret != 0) {
489                 return map_nt_error_from_unix(ret);
490         }
491         return NT_STATUS_OK;
492 }
493
494 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
495                                                struct messaging_rec *rec)
496 {
497         struct messaging_rec *result;
498         size_t fds_size = sizeof(int64_t) * rec->num_fds;
499         size_t payload_len;
500
501         payload_len = rec->buf.length + fds_size;
502         if (payload_len < rec->buf.length) {
503                 /* overflow */
504                 return NULL;
505         }
506
507         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
508                                       payload_len);
509         if (result == NULL) {
510                 return NULL;
511         }
512         *result = *rec;
513
514         /* Doesn't fail, see talloc_pooled_object */
515
516         result->buf.data = talloc_memdup(result, rec->buf.data,
517                                          rec->buf.length);
518
519         result->fds = NULL;
520         if (result->num_fds > 0) {
521                 result->fds = talloc_memdup(result, rec->fds, fds_size);
522         }
523
524         return result;
525 }
526
527 struct messaging_filtered_read_state {
528         struct tevent_context *ev;
529         struct messaging_context *msg_ctx;
530         void *tevent_handle;
531
532         bool (*filter)(struct messaging_rec *rec, void *private_data);
533         void *private_data;
534
535         struct messaging_rec *rec;
536 };
537
538 static void messaging_filtered_read_cleanup(struct tevent_req *req,
539                                             enum tevent_req_state req_state);
540
541 struct tevent_req *messaging_filtered_read_send(
542         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
543         struct messaging_context *msg_ctx,
544         bool (*filter)(struct messaging_rec *rec, void *private_data),
545         void *private_data)
546 {
547         struct tevent_req *req;
548         struct messaging_filtered_read_state *state;
549         size_t new_waiters_len;
550
551         req = tevent_req_create(mem_ctx, &state,
552                                 struct messaging_filtered_read_state);
553         if (req == NULL) {
554                 return NULL;
555         }
556         state->ev = ev;
557         state->msg_ctx = msg_ctx;
558         state->filter = filter;
559         state->private_data = private_data;
560
561         /*
562          * We have to defer the callback here, as we might be called from
563          * within a different tevent_context than state->ev
564          */
565         tevent_req_defer_callback(req, state->ev);
566
567         state->tevent_handle = messaging_dgm_register_tevent_context(
568                 state, ev);
569         if (tevent_req_nomem(state->tevent_handle, req)) {
570                 return tevent_req_post(req, ev);
571         }
572
573         /*
574          * We add ourselves to the "new_waiters" array, not the "waiters"
575          * array. If we are called from within messaging_read_done,
576          * messaging_dispatch_rec will be in an active for-loop on
577          * "waiters". We must be careful not to mess with this array, because
578          * it could mean that a single event is being delivered twice.
579          */
580
581         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
582
583         if (new_waiters_len == msg_ctx->num_new_waiters) {
584                 struct tevent_req **tmp;
585
586                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
587                                      struct tevent_req *, new_waiters_len+1);
588                 if (tevent_req_nomem(tmp, req)) {
589                         return tevent_req_post(req, ev);
590                 }
591                 msg_ctx->new_waiters = tmp;
592         }
593
594         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
595         msg_ctx->num_new_waiters += 1;
596         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
597
598         return req;
599 }
600
601 static void messaging_filtered_read_cleanup(struct tevent_req *req,
602                                             enum tevent_req_state req_state)
603 {
604         struct messaging_filtered_read_state *state = tevent_req_data(
605                 req, struct messaging_filtered_read_state);
606         struct messaging_context *msg_ctx = state->msg_ctx;
607         unsigned i;
608
609         tevent_req_set_cleanup_fn(req, NULL);
610
611         TALLOC_FREE(state->tevent_handle);
612
613         /*
614          * Just set the [new_]waiters entry to NULL, be careful not to mess
615          * with the other "waiters" array contents. We are often called from
616          * within "messaging_dispatch_rec", which loops over
617          * "waiters". Messing with the "waiters" array will mess up that
618          * for-loop.
619          */
620
621         for (i=0; i<msg_ctx->num_waiters; i++) {
622                 if (msg_ctx->waiters[i] == req) {
623                         msg_ctx->waiters[i] = NULL;
624                         return;
625                 }
626         }
627
628         for (i=0; i<msg_ctx->num_new_waiters; i++) {
629                 if (msg_ctx->new_waiters[i] == req) {
630                         msg_ctx->new_waiters[i] = NULL;
631                         return;
632                 }
633         }
634 }
635
636 static void messaging_filtered_read_done(struct tevent_req *req,
637                                          struct messaging_rec *rec)
638 {
639         struct messaging_filtered_read_state *state = tevent_req_data(
640                 req, struct messaging_filtered_read_state);
641
642         state->rec = messaging_rec_dup(state, rec);
643         if (tevent_req_nomem(state->rec, req)) {
644                 return;
645         }
646         tevent_req_done(req);
647 }
648
649 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
650                                  struct messaging_rec **presult)
651 {
652         struct messaging_filtered_read_state *state = tevent_req_data(
653                 req, struct messaging_filtered_read_state);
654         int err;
655
656         if (tevent_req_is_unix_error(req, &err)) {
657                 tevent_req_received(req);
658                 return err;
659         }
660         if (presult != NULL) {
661                 *presult = talloc_move(mem_ctx, &state->rec);
662         }
663         return 0;
664 }
665
666 struct messaging_read_state {
667         uint32_t msg_type;
668         struct messaging_rec *rec;
669 };
670
671 static bool messaging_read_filter(struct messaging_rec *rec,
672                                   void *private_data);
673 static void messaging_read_done(struct tevent_req *subreq);
674
675 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
676                                        struct tevent_context *ev,
677                                        struct messaging_context *msg,
678                                        uint32_t msg_type)
679 {
680         struct tevent_req *req, *subreq;
681         struct messaging_read_state *state;
682
683         req = tevent_req_create(mem_ctx, &state,
684                                 struct messaging_read_state);
685         if (req == NULL) {
686                 return NULL;
687         }
688         state->msg_type = msg_type;
689
690         subreq = messaging_filtered_read_send(state, ev, msg,
691                                               messaging_read_filter, state);
692         if (tevent_req_nomem(subreq, req)) {
693                 return tevent_req_post(req, ev);
694         }
695         tevent_req_set_callback(subreq, messaging_read_done, req);
696         return req;
697 }
698
699 static bool messaging_read_filter(struct messaging_rec *rec,
700                                   void *private_data)
701 {
702         struct messaging_read_state *state = talloc_get_type_abort(
703                 private_data, struct messaging_read_state);
704
705         if (rec->num_fds != 0) {
706                 return false;
707         }
708
709         return rec->msg_type == state->msg_type;
710 }
711
712 static void messaging_read_done(struct tevent_req *subreq)
713 {
714         struct tevent_req *req = tevent_req_callback_data(
715                 subreq, struct tevent_req);
716         struct messaging_read_state *state = tevent_req_data(
717                 req, struct messaging_read_state);
718         int ret;
719
720         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
721         TALLOC_FREE(subreq);
722         if (tevent_req_error(req, ret)) {
723                 return;
724         }
725         tevent_req_done(req);
726 }
727
728 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
729                         struct messaging_rec **presult)
730 {
731         struct messaging_read_state *state = tevent_req_data(
732                 req, struct messaging_read_state);
733         int err;
734
735         if (tevent_req_is_unix_error(req, &err)) {
736                 return err;
737         }
738         if (presult != NULL) {
739                 *presult = talloc_move(mem_ctx, &state->rec);
740         }
741         return 0;
742 }
743
744 struct messaging_handler_state {
745         struct tevent_context *ev;
746         struct messaging_context *msg_ctx;
747         uint32_t msg_type;
748         bool (*handler)(struct messaging_context *msg_ctx,
749                         struct messaging_rec **rec, void *private_data);
750         void *private_data;
751 };
752
753 static void messaging_handler_got_msg(struct tevent_req *subreq);
754
755 struct tevent_req *messaging_handler_send(
756         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
757         struct messaging_context *msg_ctx, uint32_t msg_type,
758         bool (*handler)(struct messaging_context *msg_ctx,
759                         struct messaging_rec **rec, void *private_data),
760         void *private_data)
761 {
762         struct tevent_req *req, *subreq;
763         struct messaging_handler_state *state;
764
765         req = tevent_req_create(mem_ctx, &state,
766                                 struct messaging_handler_state);
767         if (req == NULL) {
768                 return NULL;
769         }
770         state->ev = ev;
771         state->msg_ctx = msg_ctx;
772         state->msg_type = msg_type;
773         state->handler = handler;
774         state->private_data = private_data;
775
776         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
777                                      state->msg_type);
778         if (tevent_req_nomem(subreq, req)) {
779                 return tevent_req_post(req, ev);
780         }
781         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
782         return req;
783 }
784
785 static void messaging_handler_got_msg(struct tevent_req *subreq)
786 {
787         struct tevent_req *req = tevent_req_callback_data(
788                 subreq, struct tevent_req);
789         struct messaging_handler_state *state = tevent_req_data(
790                 req, struct messaging_handler_state);
791         struct messaging_rec *rec;
792         int ret;
793         bool ok;
794
795         ret = messaging_read_recv(subreq, state, &rec);
796         TALLOC_FREE(subreq);
797         if (tevent_req_error(req, ret)) {
798                 return;
799         }
800
801         subreq = messaging_read_send(state, state->ev, state->msg_ctx,
802                                      state->msg_type);
803         if (tevent_req_nomem(subreq, req)) {
804                 return;
805         }
806         tevent_req_set_callback(subreq, messaging_handler_got_msg, req);
807
808         ok = state->handler(state->msg_ctx, &rec, state->private_data);
809         TALLOC_FREE(rec);
810         if (ok) {
811                 /*
812                  * Next round
813                  */
814                 return;
815         }
816         TALLOC_FREE(subreq);
817         tevent_req_done(req);
818 }
819
820 int messaging_handler_recv(struct tevent_req *req)
821 {
822         return tevent_req_simple_recv_unix(req);
823 }
824
825 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
826 {
827         if (msg_ctx->num_new_waiters == 0) {
828                 return true;
829         }
830
831         if (talloc_array_length(msg_ctx->waiters) <
832             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
833                 struct tevent_req **tmp;
834                 tmp = talloc_realloc(
835                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
836                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
837                 if (tmp == NULL) {
838                         DEBUG(1, ("%s: talloc failed\n", __func__));
839                         return false;
840                 }
841                 msg_ctx->waiters = tmp;
842         }
843
844         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
845                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
846
847         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
848         msg_ctx->num_new_waiters = 0;
849
850         return true;
851 }
852
853 /*
854   Dispatch one messaging_rec
855 */
856 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
857                                    struct messaging_rec *rec)
858 {
859         struct messaging_callback *cb, *next;
860         unsigned i;
861         size_t j;
862
863         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
864                 next = cb->next;
865                 if (cb->msg_type != rec->msg_type) {
866                         continue;
867                 }
868
869                 /*
870                  * the old style callbacks don't support fd passing
871                  */
872                 for (j=0; j < rec->num_fds; j++) {
873                         int fd = rec->fds[j];
874                         close(fd);
875                 }
876                 rec->num_fds = 0;
877                 rec->fds = NULL;
878
879                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
880                        rec->src, &rec->buf);
881
882                 /*
883                  * we continue looking for matching messages after finding
884                  * one. This matters for subsystems like the internal notify
885                  * code which register more than one handler for the same
886                  * message type
887                  */
888         }
889
890         if (!messaging_append_new_waiters(msg_ctx)) {
891                 for (j=0; j < rec->num_fds; j++) {
892                         int fd = rec->fds[j];
893                         close(fd);
894                 }
895                 rec->num_fds = 0;
896                 rec->fds = NULL;
897                 return;
898         }
899
900         i = 0;
901         while (i < msg_ctx->num_waiters) {
902                 struct tevent_req *req;
903                 struct messaging_filtered_read_state *state;
904
905                 req = msg_ctx->waiters[i];
906                 if (req == NULL) {
907                         /*
908                          * This got cleaned up. In the meantime,
909                          * move everything down one. We need
910                          * to keep the order of waiters, as
911                          * other code may depend on this.
912                          */
913                         if (i < msg_ctx->num_waiters - 1) {
914                                 memmove(&msg_ctx->waiters[i],
915                                         &msg_ctx->waiters[i+1],
916                                         sizeof(struct tevent_req *) *
917                                             (msg_ctx->num_waiters - i - 1));
918                         }
919                         msg_ctx->num_waiters -= 1;
920                         continue;
921                 }
922
923                 state = tevent_req_data(
924                         req, struct messaging_filtered_read_state);
925                 if (state->filter(rec, state->private_data)) {
926                         messaging_filtered_read_done(req, rec);
927
928                         /*
929                          * Only the first one gets the fd-array
930                          */
931                         rec->num_fds = 0;
932                         rec->fds = NULL;
933                 }
934
935                 i += 1;
936         }
937
938         /*
939          * If the fd-array isn't used, just close it.
940          */
941         for (j=0; j < rec->num_fds; j++) {
942                 int fd = rec->fds[j];
943                 close(fd);
944         }
945         rec->num_fds = 0;
946         rec->fds = NULL;
947 }
948
949 static int mess_parent_dgm_cleanup(void *private_data);
950 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
951
952 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
953 {
954         struct tevent_req *req;
955
956         req = background_job_send(
957                 msg, msg->event_ctx, msg, NULL, 0,
958                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
959                             60*15),
960                 mess_parent_dgm_cleanup, msg);
961         if (req == NULL) {
962                 return false;
963         }
964         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
965         return true;
966 }
967
968 static int mess_parent_dgm_cleanup(void *private_data)
969 {
970         int ret;
971
972         ret = messaging_dgm_wipe();
973         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
974                    ret ? strerror(ret) : "ok"));
975         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
976                            60*15);
977 }
978
979 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
980 {
981         struct messaging_context *msg = tevent_req_callback_data(
982                 req, struct messaging_context);
983         NTSTATUS status;
984
985         status = background_job_recv(req);
986         TALLOC_FREE(req);
987         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
988                   nt_errstr(status)));
989
990         req = background_job_send(
991                 msg, msg->event_ctx, msg, NULL, 0,
992                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
993                             60*15),
994                 mess_parent_dgm_cleanup, msg);
995         if (req == NULL) {
996                 DEBUG(1, ("background_job_send failed\n"));
997                 return;
998         }
999         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1000 }
1001
1002 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1003 {
1004         int ret;
1005
1006         if (pid == 0) {
1007                 ret = messaging_dgm_wipe();
1008         } else {
1009                 ret = messaging_dgm_cleanup(pid);
1010         }
1011
1012         return ret;
1013 }
1014
1015 struct tevent_context *messaging_tevent_context(
1016         struct messaging_context *msg_ctx)
1017 {
1018         return msg_ctx->event_ctx;
1019 }
1020
1021 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1022 {
1023         return msg_ctx->names_db;
1024 }
1025
1026 /** @} **/