messages_ctdb: Handle async msgs for nested event contexts
[metze/samba/wip.git] / source3 / lib / messages.c
1 /* 
2    Unix SMB/CIFS implementation.
3    Samba internal messaging functions
4    Copyright (C) Andrew Tridgell 2000
5    Copyright (C) 2001 by Martin Pool
6    Copyright (C) 2002 by Jeremy Allison
7    Copyright (C) 2007 by Volker Lendecke
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18
19    You should have received a copy of the GNU General Public License
20    along with this program.  If not, see <http://www.gnu.org/licenses/>.
21 */
22
23 /**
24   @defgroup messages Internal messaging framework
25   @{
26   @file messages.c
27
28   @brief  Module for internal messaging between Samba daemons. 
29
30    The idea is that if a part of Samba wants to do communication with
31    another Samba process then it will do a message_register() of a
32    dispatch function, and use message_send_pid() to send messages to
33    that process.
34
35    The dispatch function is given the pid of the sender, and it can
36    use that to reply by message_send_pid().  See ping_message() for a
37    simple example.
38
39    @caution Dispatch functions must be able to cope with incoming
40    messages on an *odd* byte boundary.
41
42    This system doesn't have any inherent size limitations but is not
43    very efficient for large messages or when messages are sent in very
44    quick succession.
45
46 */
47
48 #include "includes.h"
49 #include "lib/util/server_id.h"
50 #include "dbwrap/dbwrap.h"
51 #include "serverid.h"
52 #include "messages.h"
53 #include "lib/util/tevent_unix.h"
54 #include "lib/background.h"
55 #include "lib/messages_dgm.h"
56 #include "lib/messages_ctdbd.h"
57 #include "lib/util/iov_buf.h"
58 #include "lib/util/server_id_db.h"
59 #include "lib/messages_dgm_ref.h"
60 #include "lib/messages_util.h"
61
62 struct messaging_callback {
63         struct messaging_callback *prev, *next;
64         uint32_t msg_type;
65         void (*fn)(struct messaging_context *msg, void *private_data, 
66                    uint32_t msg_type, 
67                    struct server_id server_id, DATA_BLOB *data);
68         void *private_data;
69 };
70
71 struct messaging_registered_ev {
72         struct tevent_context *ev;
73         struct tevent_immediate *im;
74         size_t refcount;
75 };
76
77 struct messaging_context {
78         struct server_id id;
79         struct tevent_context *event_ctx;
80         struct messaging_callback *callbacks;
81
82         struct messaging_rec *posted_msgs;
83
84         struct messaging_registered_ev *event_contexts;
85
86         struct tevent_req **new_waiters;
87         size_t num_new_waiters;
88
89         struct tevent_req **waiters;
90         size_t num_waiters;
91
92         void *msg_dgm_ref;
93         struct messaging_backend *remote;
94         struct messaging_ctdbd_fde *cluster_fde;
95
96         struct server_id_db *names_db;
97 };
98
99 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
100                                                struct messaging_rec *rec);
101 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
102                                        struct messaging_rec *rec);
103 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
104                                        struct tevent_context *ev,
105                                        struct messaging_rec *rec);
106 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
107                                    struct tevent_context *ev,
108                                    struct messaging_rec *rec);
109
110 /****************************************************************************
111  A useful function for testing the message system.
112 ****************************************************************************/
113
114 static void ping_message(struct messaging_context *msg_ctx,
115                          void *private_data,
116                          uint32_t msg_type,
117                          struct server_id src,
118                          DATA_BLOB *data)
119 {
120         struct server_id_buf idbuf;
121
122         DEBUG(1, ("INFO: Received PING message from PID %s [%.*s]\n",
123                   server_id_str_buf(src, &idbuf), (int)data->length,
124                   data->data ? (char *)data->data : ""));
125
126         messaging_send(msg_ctx, src, MSG_PONG, data);
127 }
128
129 struct messaging_rec *messaging_rec_create(
130         TALLOC_CTX *mem_ctx, struct server_id src, struct server_id dst,
131         uint32_t msg_type, const struct iovec *iov, int iovlen,
132         const int *fds, size_t num_fds)
133 {
134         ssize_t buflen;
135         uint8_t *buf;
136         struct messaging_rec *result;
137
138         if (num_fds > INT8_MAX) {
139                 return NULL;
140         }
141
142         buflen = iov_buflen(iov, iovlen);
143         if (buflen == -1) {
144                 return NULL;
145         }
146         buf = talloc_array(mem_ctx, uint8_t, buflen);
147         if (buf == NULL) {
148                 return NULL;
149         }
150         iov_buf(iov, iovlen, buf, buflen);
151
152         {
153                 struct messaging_rec rec;
154                 int64_t fds64[num_fds];
155                 size_t i;
156
157                 for (i=0; i<num_fds; i++) {
158                         fds64[i] = fds[i];
159                 }
160
161                 rec = (struct messaging_rec) {
162                         .msg_version = MESSAGE_VERSION, .msg_type = msg_type,
163                         .src = src, .dest = dst,
164                         .buf.data = buf, .buf.length = buflen,
165                         .num_fds = num_fds, .fds = fds64,
166                 };
167
168                 result = messaging_rec_dup(mem_ctx, &rec);
169         }
170
171         TALLOC_FREE(buf);
172
173         return result;
174 }
175
176 static bool messaging_register_event_context(struct messaging_context *ctx,
177                                              struct tevent_context *ev)
178 {
179         size_t i, num_event_contexts;
180         struct messaging_registered_ev *free_reg = NULL;
181         struct messaging_registered_ev *tmp;
182
183         num_event_contexts = talloc_array_length(ctx->event_contexts);
184
185         for (i=0; i<num_event_contexts; i++) {
186                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
187
188                 if (reg->ev == ev) {
189                         reg->refcount += 1;
190                         return true;
191                 }
192                 if (reg->refcount == 0) {
193                         if (reg->ev != NULL) {
194                                 abort();
195                         }
196                         free_reg = reg;
197                 }
198         }
199
200         if (free_reg == NULL) {
201                 tmp = talloc_realloc(ctx, ctx->event_contexts,
202                                      struct messaging_registered_ev,
203                                      num_event_contexts+1);
204                 if (tmp == NULL) {
205                         return false;
206                 }
207                 ctx->event_contexts = tmp;
208
209                 free_reg = &ctx->event_contexts[num_event_contexts];
210         }
211
212         *free_reg = (struct messaging_registered_ev) { .ev = ev, .refcount = 1 };
213
214         return true;
215 }
216
217 static bool messaging_deregister_event_context(struct messaging_context *ctx,
218                                                struct tevent_context *ev)
219 {
220         size_t i, num_event_contexts;
221
222         num_event_contexts = talloc_array_length(ctx->event_contexts);
223
224         for (i=0; i<num_event_contexts; i++) {
225                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
226
227                 if (reg->ev == ev) {
228                         if (reg->refcount == 0) {
229                                 return false;
230                         }
231                         reg->refcount -= 1;
232
233                         if (reg->refcount == 0) {
234                                 /*
235                                  * Not strictly necessary, just
236                                  * paranoia
237                                  */
238                                 reg->ev = NULL;
239
240                                 /*
241                                  * Do not talloc_free(reg->im),
242                                  * recycle immediates events.
243                                  */
244                         }
245                         return true;
246                 }
247         }
248         return false;
249 }
250
251 static void messaging_post_main_event_context(struct tevent_context *ev,
252                                               struct tevent_immediate *im,
253                                               void *private_data)
254 {
255         struct messaging_context *ctx = talloc_get_type_abort(
256                 private_data, struct messaging_context);
257
258         while (ctx->posted_msgs != NULL) {
259                 struct messaging_rec *rec = ctx->posted_msgs;
260                 bool consumed;
261
262                 DLIST_REMOVE(ctx->posted_msgs, rec);
263
264                 consumed = messaging_dispatch_classic(ctx, rec);
265                 if (!consumed) {
266                         consumed = messaging_dispatch_waiters(
267                                 ctx, ctx->event_ctx, rec);
268                 }
269
270                 if (!consumed) {
271                         uint8_t i;
272
273                         for (i=0; i<rec->num_fds; i++) {
274                                 close(rec->fds[i]);
275                         }
276                 }
277
278                 TALLOC_FREE(rec);
279         }
280 }
281
282 static void messaging_post_sub_event_context(struct tevent_context *ev,
283                                              struct tevent_immediate *im,
284                                              void *private_data)
285 {
286         struct messaging_context *ctx = talloc_get_type_abort(
287                 private_data, struct messaging_context);
288         struct messaging_rec *rec, *next;
289
290         for (rec = ctx->posted_msgs; rec != NULL; rec = next) {
291                 bool consumed;
292
293                 next = rec->next;
294
295                 consumed = messaging_dispatch_waiters(ctx, ev, rec);
296                 if (consumed) {
297                         DLIST_REMOVE(ctx->posted_msgs, rec);
298                         TALLOC_FREE(rec);
299                 }
300         }
301 }
302
303 static bool messaging_alert_event_contexts(struct messaging_context *ctx)
304 {
305         size_t i, num_event_contexts;
306
307         num_event_contexts = talloc_array_length(ctx->event_contexts);
308
309         for (i=0; i<num_event_contexts; i++) {
310                 struct messaging_registered_ev *reg = &ctx->event_contexts[i];
311
312                 if (reg->refcount == 0) {
313                         continue;
314                 }
315
316                 if (reg->im == NULL) {
317                         reg->im = tevent_create_immediate(
318                                 ctx->event_contexts);
319                 }
320                 if (reg->im == NULL) {
321                         DBG_WARNING("Could not create immediate\n");
322                         continue;
323                 }
324
325                 /*
326                  * We depend on schedule_immediate to work
327                  * multiple times. Might be a bit inefficient,
328                  * but this needs to be proven in tests. The
329                  * alternatively would be to track whether the
330                  * immediate has already been scheduled. For
331                  * now, avoid that complexity here.
332                  */
333
334                 if (reg->ev == ctx->event_ctx) {
335                         tevent_schedule_immediate(
336                                 reg->im, reg->ev,
337                                 messaging_post_main_event_context,
338                                 ctx);
339                 } else {
340                         tevent_schedule_immediate(
341                                 reg->im, reg->ev,
342                                 messaging_post_sub_event_context,
343                                 ctx);
344                 }
345
346         }
347         return true;
348 }
349
350 static void messaging_recv_cb(struct tevent_context *ev,
351                               const uint8_t *msg, size_t msg_len,
352                               int *fds, size_t num_fds,
353                               void *private_data)
354 {
355         struct messaging_context *msg_ctx = talloc_get_type_abort(
356                 private_data, struct messaging_context);
357         struct server_id_buf idbuf;
358         struct messaging_rec rec;
359         int64_t fds64[MIN(num_fds, INT8_MAX)];
360         size_t i;
361
362         if (msg_len < MESSAGE_HDR_LENGTH) {
363                 DBG_WARNING("message too short: %zu\n", msg_len);
364                 goto close_fail;
365         }
366
367         if (num_fds > INT8_MAX) {
368                 DBG_WARNING("too many fds: %zu\n", num_fds);
369                 goto close_fail;
370         }
371
372         /*
373          * "consume" the fds by copying them and setting
374          * the original variable to -1
375          */
376         for (i=0; i < num_fds; i++) {
377                 fds64[i] = fds[i];
378                 fds[i] = -1;
379         }
380
381         rec = (struct messaging_rec) {
382                 .msg_version = MESSAGE_VERSION,
383                 .buf.data = discard_const_p(uint8_t, msg) + MESSAGE_HDR_LENGTH,
384                 .buf.length = msg_len - MESSAGE_HDR_LENGTH,
385                 .num_fds = num_fds,
386                 .fds = fds64,
387         };
388
389         message_hdr_get(&rec.msg_type, &rec.src, &rec.dest, msg);
390
391         DBG_DEBUG("Received message 0x%x len %zu (num_fds:%zu) from %s\n",
392                   (unsigned)rec.msg_type, rec.buf.length, num_fds,
393                   server_id_str_buf(rec.src, &idbuf));
394
395         messaging_dispatch_rec(msg_ctx, ev, &rec);
396         return;
397
398 close_fail:
399         for (i=0; i < num_fds; i++) {
400                 close(fds[i]);
401         }
402 }
403
404 static int messaging_context_destructor(struct messaging_context *ctx)
405 {
406         size_t i;
407
408         for (i=0; i<ctx->num_new_waiters; i++) {
409                 if (ctx->new_waiters[i] != NULL) {
410                         tevent_req_set_cleanup_fn(ctx->new_waiters[i], NULL);
411                         ctx->new_waiters[i] = NULL;
412                 }
413         }
414         for (i=0; i<ctx->num_waiters; i++) {
415                 if (ctx->waiters[i] != NULL) {
416                         tevent_req_set_cleanup_fn(ctx->waiters[i], NULL);
417                         ctx->waiters[i] = NULL;
418                 }
419         }
420
421         /*
422          * The immediates from messaging_alert_event_contexts
423          * reference "ctx". Don't let them outlive the
424          * messaging_context we're destroying here.
425          */
426         TALLOC_FREE(ctx->event_contexts);
427
428         return 0;
429 }
430
431 static const char *private_path(const char *name)
432 {
433         return talloc_asprintf(talloc_tos(), "%s/%s", lp_private_dir(), name);
434 }
435
436 static NTSTATUS messaging_init_internal(TALLOC_CTX *mem_ctx,
437                                         struct tevent_context *ev,
438                                         struct messaging_context **pmsg_ctx)
439 {
440         TALLOC_CTX *frame;
441         struct messaging_context *ctx;
442         NTSTATUS status = NT_STATUS_UNSUCCESSFUL;
443         int ret;
444         const char *lck_path;
445         const char *priv_path;
446         bool ok;
447
448         lck_path = lock_path("msg.lock");
449         if (lck_path == NULL) {
450                 return NT_STATUS_NO_MEMORY;
451         }
452
453         ok = directory_create_or_exist_strict(lck_path,
454                                               sec_initial_uid(),
455                                               0755);
456         if (!ok) {
457                 DBG_DEBUG("Could not create lock directory: %s\n",
458                           strerror(errno));
459                 return NT_STATUS_ACCESS_DENIED;
460         }
461
462         priv_path = private_path("msg.sock");
463         if (priv_path == NULL) {
464                 return NT_STATUS_NO_MEMORY;
465         }
466
467         ok = directory_create_or_exist_strict(priv_path, sec_initial_uid(),
468                                               0700);
469         if (!ok) {
470                 DBG_DEBUG("Could not create msg directory: %s\n",
471                           strerror(errno));
472                 return NT_STATUS_ACCESS_DENIED;
473         }
474
475         frame = talloc_stackframe();
476         if (frame == NULL) {
477                 return NT_STATUS_NO_MEMORY;
478         }
479
480         ctx = talloc_zero(frame, struct messaging_context);
481         if (ctx == NULL) {
482                 status = NT_STATUS_NO_MEMORY;
483                 goto done;
484         }
485
486         ctx->id = (struct server_id) {
487                 .pid = getpid(), .vnn = NONCLUSTER_VNN
488         };
489
490         ctx->event_ctx = ev;
491
492         ok = messaging_register_event_context(ctx, ev);
493         if (!ok) {
494                 status = NT_STATUS_NO_MEMORY;
495                 goto done;
496         }
497
498         sec_init();
499
500         ctx->msg_dgm_ref = messaging_dgm_ref(ctx,
501                                              ctx->event_ctx,
502                                              &ctx->id.unique_id,
503                                              priv_path,
504                                              lck_path,
505                                              messaging_recv_cb,
506                                              ctx,
507                                              &ret);
508         if (ctx->msg_dgm_ref == NULL) {
509                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
510                 status = map_nt_error_from_unix(ret);
511                 goto done;
512         }
513         talloc_set_destructor(ctx, messaging_context_destructor);
514
515         if (lp_clustering()) {
516                 ret = messaging_ctdbd_init(
517                         ctx, ctx, messaging_recv_cb, ctx, &ctx->remote);
518
519                 if (ret != 0) {
520                         DEBUG(2, ("messaging_ctdbd_init failed: %s\n",
521                                   strerror(ret)));
522                         status = map_nt_error_from_unix(ret);
523                         goto done;
524                 }
525                 ctx->cluster_fde = messaging_ctdbd_register_tevent_context(
526                         ctx, ctx->event_ctx, ctx->remote);
527                 if (ctx->cluster_fde == NULL) {
528                         DBG_WARNING("messaging_ctdbd_register_tevent_context "
529                                     "failed\n");
530                         status = NT_STATUS_NO_MEMORY;
531                         goto done;
532                 }
533         }
534         ctx->id.vnn = get_my_vnn();
535
536         ctx->names_db = server_id_db_init(ctx,
537                                           ctx->id,
538                                           lp_lock_directory(),
539                                           0,
540                                           TDB_INCOMPATIBLE_HASH|TDB_CLEAR_IF_FIRST);
541         if (ctx->names_db == NULL) {
542                 DBG_DEBUG("server_id_db_init failed\n");
543                 status = NT_STATUS_NO_MEMORY;
544                 goto done;
545         }
546
547         messaging_register(ctx, NULL, MSG_PING, ping_message);
548
549         /* Register some debugging related messages */
550
551         register_msg_pool_usage(ctx);
552         register_dmalloc_msgs(ctx);
553         debug_register_msgs(ctx);
554
555         {
556                 struct server_id_buf tmp;
557                 DBG_DEBUG("my id: %s\n", server_id_str_buf(ctx->id, &tmp));
558         }
559
560         *pmsg_ctx = talloc_steal(mem_ctx, ctx);
561
562         status = NT_STATUS_OK;
563 done:
564         TALLOC_FREE(frame);
565
566         return status;
567 }
568
569 struct messaging_context *messaging_init(TALLOC_CTX *mem_ctx,
570                                          struct tevent_context *ev)
571 {
572         struct messaging_context *ctx = NULL;
573         NTSTATUS status;
574
575         status = messaging_init_internal(mem_ctx,
576                                          ev,
577                                          &ctx);
578         if (!NT_STATUS_IS_OK(status)) {
579                 return NULL;
580         }
581
582         return ctx;
583 }
584
585 NTSTATUS messaging_init_client(TALLOC_CTX *mem_ctx,
586                                struct tevent_context *ev,
587                                struct messaging_context **pmsg_ctx)
588 {
589         return messaging_init_internal(mem_ctx,
590                                         ev,
591                                         pmsg_ctx);
592 }
593
594 struct server_id messaging_server_id(const struct messaging_context *msg_ctx)
595 {
596         return msg_ctx->id;
597 }
598
599 /*
600  * re-init after a fork
601  */
602 NTSTATUS messaging_reinit(struct messaging_context *msg_ctx)
603 {
604         int ret;
605         char *lck_path;
606
607         TALLOC_FREE(msg_ctx->msg_dgm_ref);
608
609         msg_ctx->id = (struct server_id) {
610                 .pid = getpid(), .vnn = msg_ctx->id.vnn
611         };
612
613         lck_path = lock_path("msg.lock");
614         if (lck_path == NULL) {
615                 return NT_STATUS_NO_MEMORY;
616         }
617
618         msg_ctx->msg_dgm_ref = messaging_dgm_ref(
619                 msg_ctx, msg_ctx->event_ctx, &msg_ctx->id.unique_id,
620                 private_path("msg.sock"), lck_path,
621                 messaging_recv_cb, msg_ctx, &ret);
622
623         if (msg_ctx->msg_dgm_ref == NULL) {
624                 DEBUG(2, ("messaging_dgm_ref failed: %s\n", strerror(ret)));
625                 return map_nt_error_from_unix(ret);
626         }
627
628         if (lp_clustering()) {
629                 TALLOC_FREE(msg_ctx->cluster_fde);
630
631                 ret = messaging_ctdbd_reinit(
632                         msg_ctx, msg_ctx, messaging_recv_cb, msg_ctx,
633                         msg_ctx->remote);
634
635                 if (ret != 0) {
636                         DEBUG(1, ("messaging_ctdbd_init failed: %s\n",
637                                   strerror(ret)));
638                         return map_nt_error_from_unix(ret);
639                 }
640
641                 msg_ctx->cluster_fde = messaging_ctdbd_register_tevent_context(
642                         msg_ctx, msg_ctx->event_ctx, msg_ctx->remote);
643                 if (msg_ctx->cluster_fde == NULL) {
644                         DBG_WARNING("messaging_ctdbd_register_tevent_context "
645                                     "failed\n");
646                         return NT_STATUS_NO_MEMORY;
647                 }
648         }
649
650         server_id_db_reinit(msg_ctx->names_db, msg_ctx->id);
651
652         return NT_STATUS_OK;
653 }
654
655
656 /*
657  * Register a dispatch function for a particular message type. Allow multiple
658  * registrants
659 */
660 NTSTATUS messaging_register(struct messaging_context *msg_ctx,
661                             void *private_data,
662                             uint32_t msg_type,
663                             void (*fn)(struct messaging_context *msg,
664                                        void *private_data, 
665                                        uint32_t msg_type, 
666                                        struct server_id server_id,
667                                        DATA_BLOB *data))
668 {
669         struct messaging_callback *cb;
670
671         DEBUG(5, ("Registering messaging pointer for type %u - "
672                   "private_data=%p\n",
673                   (unsigned)msg_type, private_data));
674
675         /*
676          * Only one callback per type
677          */
678
679         for (cb = msg_ctx->callbacks; cb != NULL; cb = cb->next) {
680                 /* we allow a second registration of the same message
681                    type if it has a different private pointer. This is
682                    needed in, for example, the internal notify code,
683                    which creates a new notify context for each tree
684                    connect, and expects to receive messages to each of
685                    them. */
686                 if (cb->msg_type == msg_type && private_data == cb->private_data) {
687                         DEBUG(5,("Overriding messaging pointer for type %u - private_data=%p\n",
688                                   (unsigned)msg_type, private_data));
689                         cb->fn = fn;
690                         cb->private_data = private_data;
691                         return NT_STATUS_OK;
692                 }
693         }
694
695         if (!(cb = talloc(msg_ctx, struct messaging_callback))) {
696                 return NT_STATUS_NO_MEMORY;
697         }
698
699         cb->msg_type = msg_type;
700         cb->fn = fn;
701         cb->private_data = private_data;
702
703         DLIST_ADD(msg_ctx->callbacks, cb);
704         return NT_STATUS_OK;
705 }
706
707 /*
708   De-register the function for a particular message type.
709 */
710 void messaging_deregister(struct messaging_context *ctx, uint32_t msg_type,
711                           void *private_data)
712 {
713         struct messaging_callback *cb, *next;
714
715         for (cb = ctx->callbacks; cb; cb = next) {
716                 next = cb->next;
717                 if ((cb->msg_type == msg_type)
718                     && (cb->private_data == private_data)) {
719                         DEBUG(5,("Deregistering messaging pointer for type %u - private_data=%p\n",
720                                   (unsigned)msg_type, private_data));
721                         DLIST_REMOVE(ctx->callbacks, cb);
722                         TALLOC_FREE(cb);
723                 }
724         }
725 }
726
727 /*
728   Send a message to a particular server
729 */
730 NTSTATUS messaging_send(struct messaging_context *msg_ctx,
731                         struct server_id server, uint32_t msg_type,
732                         const DATA_BLOB *data)
733 {
734         struct iovec iov = {0};
735
736         if (data != NULL) {
737                 iov.iov_base = data->data;
738                 iov.iov_len = data->length;
739         };
740
741         return messaging_send_iov(msg_ctx, server, msg_type, &iov, 1, NULL, 0);
742 }
743
744 NTSTATUS messaging_send_buf(struct messaging_context *msg_ctx,
745                             struct server_id server, uint32_t msg_type,
746                             const uint8_t *buf, size_t len)
747 {
748         DATA_BLOB blob = data_blob_const(buf, len);
749         return messaging_send(msg_ctx, server, msg_type, &blob);
750 }
751
752 static int messaging_post_self(struct messaging_context *msg_ctx,
753                                struct server_id src, struct server_id dst,
754                                uint32_t msg_type,
755                                const struct iovec *iov, int iovlen,
756                                const int *fds, size_t num_fds)
757 {
758         struct messaging_rec *rec;
759         bool ok;
760
761         rec = messaging_rec_create(
762                 msg_ctx, src, dst, msg_type, iov, iovlen, fds, num_fds);
763         if (rec == NULL) {
764                 return ENOMEM;
765         }
766
767         ok = messaging_alert_event_contexts(msg_ctx);
768         if (!ok) {
769                 TALLOC_FREE(rec);
770                 return ENOMEM;
771         }
772
773         DLIST_ADD_END(msg_ctx->posted_msgs, rec);
774
775         return 0;
776 }
777
778 int messaging_send_iov_from(struct messaging_context *msg_ctx,
779                             struct server_id src, struct server_id dst,
780                             uint32_t msg_type,
781                             const struct iovec *iov, int iovlen,
782                             const int *fds, size_t num_fds)
783 {
784         int ret;
785         uint8_t hdr[MESSAGE_HDR_LENGTH];
786         struct iovec iov2[iovlen+1];
787
788         if (server_id_is_disconnected(&dst)) {
789                 return EINVAL;
790         }
791
792         if (num_fds > INT8_MAX) {
793                 return EINVAL;
794         }
795
796         if (dst.vnn != msg_ctx->id.vnn) {
797                 if (num_fds > 0) {
798                         return ENOSYS;
799                 }
800
801                 ret = msg_ctx->remote->send_fn(src, dst,
802                                                msg_type, iov, iovlen,
803                                                NULL, 0,
804                                                msg_ctx->remote);
805                 return ret;
806         }
807
808         if (server_id_equal(&dst, &msg_ctx->id)) {
809                 ret = messaging_post_self(msg_ctx, src, dst, msg_type,
810                                           iov, iovlen, fds, num_fds);
811                 return ret;
812         }
813
814         message_hdr_put(hdr, msg_type, src, dst);
815         iov2[0] = (struct iovec){ .iov_base = hdr, .iov_len = sizeof(hdr) };
816         memcpy(&iov2[1], iov, iovlen * sizeof(*iov));
817
818         ret = messaging_dgm_send(dst.pid, iov2, iovlen+1, fds, num_fds);
819
820         if (ret == EACCES) {
821                 become_root();
822                 ret = messaging_dgm_send(dst.pid, iov2, iovlen+1,
823                                          fds, num_fds);
824                 unbecome_root();
825         }
826
827         if (ret == ECONNREFUSED) {
828                 /*
829                  * Linux returns this when a socket exists in the file
830                  * system without a listening process. This is not
831                  * documented in susv4 or the linux manpages, but it's
832                  * easily testable. For the higher levels this is the
833                  * same as "destination does not exist"
834                  */
835                 ret = ENOENT;
836         }
837
838         return ret;
839 }
840
841 NTSTATUS messaging_send_iov(struct messaging_context *msg_ctx,
842                             struct server_id dst, uint32_t msg_type,
843                             const struct iovec *iov, int iovlen,
844                             const int *fds, size_t num_fds)
845 {
846         int ret;
847
848         ret = messaging_send_iov_from(msg_ctx, msg_ctx->id, dst, msg_type,
849                                       iov, iovlen, fds, num_fds);
850         if (ret != 0) {
851                 return map_nt_error_from_unix(ret);
852         }
853         return NT_STATUS_OK;
854 }
855
856 static struct messaging_rec *messaging_rec_dup(TALLOC_CTX *mem_ctx,
857                                                struct messaging_rec *rec)
858 {
859         struct messaging_rec *result;
860         size_t fds_size = sizeof(int64_t) * rec->num_fds;
861         size_t payload_len;
862
863         payload_len = rec->buf.length + fds_size;
864         if (payload_len < rec->buf.length) {
865                 /* overflow */
866                 return NULL;
867         }
868
869         result = talloc_pooled_object(mem_ctx, struct messaging_rec, 2,
870                                       payload_len);
871         if (result == NULL) {
872                 return NULL;
873         }
874         *result = *rec;
875
876         /* Doesn't fail, see talloc_pooled_object */
877
878         result->buf.data = talloc_memdup(result, rec->buf.data,
879                                          rec->buf.length);
880
881         result->fds = NULL;
882         if (result->num_fds > 0) {
883                 result->fds = talloc_memdup(result, rec->fds, fds_size);
884         }
885
886         return result;
887 }
888
889 struct messaging_filtered_read_state {
890         struct tevent_context *ev;
891         struct messaging_context *msg_ctx;
892         struct messaging_dgm_fde *fde;
893         struct messaging_ctdbd_fde *cluster_fde;
894
895         bool (*filter)(struct messaging_rec *rec, void *private_data);
896         void *private_data;
897
898         struct messaging_rec *rec;
899 };
900
901 static void messaging_filtered_read_cleanup(struct tevent_req *req,
902                                             enum tevent_req_state req_state);
903
904 struct tevent_req *messaging_filtered_read_send(
905         TALLOC_CTX *mem_ctx, struct tevent_context *ev,
906         struct messaging_context *msg_ctx,
907         bool (*filter)(struct messaging_rec *rec, void *private_data),
908         void *private_data)
909 {
910         struct tevent_req *req;
911         struct messaging_filtered_read_state *state;
912         size_t new_waiters_len;
913         bool ok;
914
915         req = tevent_req_create(mem_ctx, &state,
916                                 struct messaging_filtered_read_state);
917         if (req == NULL) {
918                 return NULL;
919         }
920         state->ev = ev;
921         state->msg_ctx = msg_ctx;
922         state->filter = filter;
923         state->private_data = private_data;
924
925         /*
926          * We have to defer the callback here, as we might be called from
927          * within a different tevent_context than state->ev
928          */
929         tevent_req_defer_callback(req, state->ev);
930
931         state->fde = messaging_dgm_register_tevent_context(state, ev);
932         if (tevent_req_nomem(state->fde, req)) {
933                 return tevent_req_post(req, ev);
934         }
935
936         if (msg_ctx->remote != NULL) {
937                 state->cluster_fde = messaging_ctdbd_register_tevent_context(
938                         state, ev, msg_ctx->remote);
939                 if (tevent_req_nomem(state->cluster_fde, req)) {
940                         return tevent_req_post(req, ev);
941                 }
942         }
943
944         /*
945          * We add ourselves to the "new_waiters" array, not the "waiters"
946          * array. If we are called from within messaging_read_done,
947          * messaging_dispatch_rec will be in an active for-loop on
948          * "waiters". We must be careful not to mess with this array, because
949          * it could mean that a single event is being delivered twice.
950          */
951
952         new_waiters_len = talloc_array_length(msg_ctx->new_waiters);
953
954         if (new_waiters_len == msg_ctx->num_new_waiters) {
955                 struct tevent_req **tmp;
956
957                 tmp = talloc_realloc(msg_ctx, msg_ctx->new_waiters,
958                                      struct tevent_req *, new_waiters_len+1);
959                 if (tevent_req_nomem(tmp, req)) {
960                         return tevent_req_post(req, ev);
961                 }
962                 msg_ctx->new_waiters = tmp;
963         }
964
965         msg_ctx->new_waiters[msg_ctx->num_new_waiters] = req;
966         msg_ctx->num_new_waiters += 1;
967         tevent_req_set_cleanup_fn(req, messaging_filtered_read_cleanup);
968
969         ok = messaging_register_event_context(msg_ctx, ev);
970         if (!ok) {
971                 tevent_req_oom(req);
972                 return tevent_req_post(req, ev);
973         }
974
975         return req;
976 }
977
978 static void messaging_filtered_read_cleanup(struct tevent_req *req,
979                                             enum tevent_req_state req_state)
980 {
981         struct messaging_filtered_read_state *state = tevent_req_data(
982                 req, struct messaging_filtered_read_state);
983         struct messaging_context *msg_ctx = state->msg_ctx;
984         size_t i;
985         bool ok;
986
987         tevent_req_set_cleanup_fn(req, NULL);
988
989         TALLOC_FREE(state->fde);
990         TALLOC_FREE(state->cluster_fde);
991
992         ok = messaging_deregister_event_context(msg_ctx, state->ev);
993         if (!ok) {
994                 abort();
995         }
996
997         /*
998          * Just set the [new_]waiters entry to NULL, be careful not to mess
999          * with the other "waiters" array contents. We are often called from
1000          * within "messaging_dispatch_rec", which loops over
1001          * "waiters". Messing with the "waiters" array will mess up that
1002          * for-loop.
1003          */
1004
1005         for (i=0; i<msg_ctx->num_waiters; i++) {
1006                 if (msg_ctx->waiters[i] == req) {
1007                         msg_ctx->waiters[i] = NULL;
1008                         return;
1009                 }
1010         }
1011
1012         for (i=0; i<msg_ctx->num_new_waiters; i++) {
1013                 if (msg_ctx->new_waiters[i] == req) {
1014                         msg_ctx->new_waiters[i] = NULL;
1015                         return;
1016                 }
1017         }
1018 }
1019
1020 static void messaging_filtered_read_done(struct tevent_req *req,
1021                                          struct messaging_rec *rec)
1022 {
1023         struct messaging_filtered_read_state *state = tevent_req_data(
1024                 req, struct messaging_filtered_read_state);
1025
1026         state->rec = messaging_rec_dup(state, rec);
1027         if (tevent_req_nomem(state->rec, req)) {
1028                 return;
1029         }
1030         tevent_req_done(req);
1031 }
1032
1033 int messaging_filtered_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1034                                  struct messaging_rec **presult)
1035 {
1036         struct messaging_filtered_read_state *state = tevent_req_data(
1037                 req, struct messaging_filtered_read_state);
1038         int err;
1039
1040         if (tevent_req_is_unix_error(req, &err)) {
1041                 tevent_req_received(req);
1042                 return err;
1043         }
1044         if (presult != NULL) {
1045                 *presult = talloc_move(mem_ctx, &state->rec);
1046         }
1047         return 0;
1048 }
1049
1050 struct messaging_read_state {
1051         uint32_t msg_type;
1052         struct messaging_rec *rec;
1053 };
1054
1055 static bool messaging_read_filter(struct messaging_rec *rec,
1056                                   void *private_data);
1057 static void messaging_read_done(struct tevent_req *subreq);
1058
1059 struct tevent_req *messaging_read_send(TALLOC_CTX *mem_ctx,
1060                                        struct tevent_context *ev,
1061                                        struct messaging_context *msg,
1062                                        uint32_t msg_type)
1063 {
1064         struct tevent_req *req, *subreq;
1065         struct messaging_read_state *state;
1066
1067         req = tevent_req_create(mem_ctx, &state,
1068                                 struct messaging_read_state);
1069         if (req == NULL) {
1070                 return NULL;
1071         }
1072         state->msg_type = msg_type;
1073
1074         subreq = messaging_filtered_read_send(state, ev, msg,
1075                                               messaging_read_filter, state);
1076         if (tevent_req_nomem(subreq, req)) {
1077                 return tevent_req_post(req, ev);
1078         }
1079         tevent_req_set_callback(subreq, messaging_read_done, req);
1080         return req;
1081 }
1082
1083 static bool messaging_read_filter(struct messaging_rec *rec,
1084                                   void *private_data)
1085 {
1086         struct messaging_read_state *state = talloc_get_type_abort(
1087                 private_data, struct messaging_read_state);
1088
1089         if (rec->num_fds != 0) {
1090                 return false;
1091         }
1092
1093         return rec->msg_type == state->msg_type;
1094 }
1095
1096 static void messaging_read_done(struct tevent_req *subreq)
1097 {
1098         struct tevent_req *req = tevent_req_callback_data(
1099                 subreq, struct tevent_req);
1100         struct messaging_read_state *state = tevent_req_data(
1101                 req, struct messaging_read_state);
1102         int ret;
1103
1104         ret = messaging_filtered_read_recv(subreq, state, &state->rec);
1105         TALLOC_FREE(subreq);
1106         if (tevent_req_error(req, ret)) {
1107                 return;
1108         }
1109         tevent_req_done(req);
1110 }
1111
1112 int messaging_read_recv(struct tevent_req *req, TALLOC_CTX *mem_ctx,
1113                         struct messaging_rec **presult)
1114 {
1115         struct messaging_read_state *state = tevent_req_data(
1116                 req, struct messaging_read_state);
1117         int err;
1118
1119         if (tevent_req_is_unix_error(req, &err)) {
1120                 return err;
1121         }
1122         if (presult != NULL) {
1123                 *presult = talloc_move(mem_ctx, &state->rec);
1124         }
1125         return 0;
1126 }
1127
1128 static bool messaging_append_new_waiters(struct messaging_context *msg_ctx)
1129 {
1130         if (msg_ctx->num_new_waiters == 0) {
1131                 return true;
1132         }
1133
1134         if (talloc_array_length(msg_ctx->waiters) <
1135             (msg_ctx->num_waiters + msg_ctx->num_new_waiters)) {
1136                 struct tevent_req **tmp;
1137                 tmp = talloc_realloc(
1138                         msg_ctx, msg_ctx->waiters, struct tevent_req *,
1139                         msg_ctx->num_waiters + msg_ctx->num_new_waiters);
1140                 if (tmp == NULL) {
1141                         DEBUG(1, ("%s: talloc failed\n", __func__));
1142                         return false;
1143                 }
1144                 msg_ctx->waiters = tmp;
1145         }
1146
1147         memcpy(&msg_ctx->waiters[msg_ctx->num_waiters], msg_ctx->new_waiters,
1148                sizeof(struct tevent_req *) * msg_ctx->num_new_waiters);
1149
1150         msg_ctx->num_waiters += msg_ctx->num_new_waiters;
1151         msg_ctx->num_new_waiters = 0;
1152
1153         return true;
1154 }
1155
1156 static bool messaging_dispatch_classic(struct messaging_context *msg_ctx,
1157                                        struct messaging_rec *rec)
1158 {
1159         struct messaging_callback *cb, *next;
1160
1161         for (cb = msg_ctx->callbacks; cb != NULL; cb = next) {
1162                 size_t j;
1163
1164                 next = cb->next;
1165                 if (cb->msg_type != rec->msg_type) {
1166                         continue;
1167                 }
1168
1169                 /*
1170                  * the old style callbacks don't support fd passing
1171                  */
1172                 for (j=0; j < rec->num_fds; j++) {
1173                         int fd = rec->fds[j];
1174                         close(fd);
1175                 }
1176                 rec->num_fds = 0;
1177                 rec->fds = NULL;
1178
1179                 cb->fn(msg_ctx, cb->private_data, rec->msg_type,
1180                        rec->src, &rec->buf);
1181
1182                 return true;
1183         }
1184
1185         return false;
1186 }
1187
1188 static bool messaging_dispatch_waiters(struct messaging_context *msg_ctx,
1189                                        struct tevent_context *ev,
1190                                        struct messaging_rec *rec)
1191 {
1192         size_t i;
1193
1194         if (!messaging_append_new_waiters(msg_ctx)) {
1195                 return false;
1196         }
1197
1198         i = 0;
1199         while (i < msg_ctx->num_waiters) {
1200                 struct tevent_req *req;
1201                 struct messaging_filtered_read_state *state;
1202
1203                 req = msg_ctx->waiters[i];
1204                 if (req == NULL) {
1205                         /*
1206                          * This got cleaned up. In the meantime,
1207                          * move everything down one. We need
1208                          * to keep the order of waiters, as
1209                          * other code may depend on this.
1210                          */
1211                         if (i < msg_ctx->num_waiters - 1) {
1212                                 memmove(&msg_ctx->waiters[i],
1213                                         &msg_ctx->waiters[i+1],
1214                                         sizeof(struct tevent_req *) *
1215                                             (msg_ctx->num_waiters - i - 1));
1216                         }
1217                         msg_ctx->num_waiters -= 1;
1218                         continue;
1219                 }
1220
1221                 state = tevent_req_data(
1222                         req, struct messaging_filtered_read_state);
1223                 if ((ev == state->ev) &&
1224                     state->filter(rec, state->private_data)) {
1225                         messaging_filtered_read_done(req, rec);
1226                         return true;
1227                 }
1228
1229                 i += 1;
1230         }
1231
1232         return false;
1233 }
1234
1235 /*
1236   Dispatch one messaging_rec
1237 */
1238 static void messaging_dispatch_rec(struct messaging_context *msg_ctx,
1239                                    struct tevent_context *ev,
1240                                    struct messaging_rec *rec)
1241 {
1242         bool consumed;
1243         size_t i;
1244
1245         if (ev == msg_ctx->event_ctx) {
1246                 consumed = messaging_dispatch_classic(msg_ctx, rec);
1247                 if (consumed) {
1248                         return;
1249                 }
1250         }
1251
1252         consumed = messaging_dispatch_waiters(msg_ctx, ev, rec);
1253         if (consumed) {
1254                 return;
1255         }
1256
1257         if (ev != msg_ctx->event_ctx) {
1258                 struct iovec iov;
1259                 int fds[rec->num_fds];
1260                 int ret;
1261
1262                 /*
1263                  * We've been listening on a nested event
1264                  * context. Messages need to be handled in the main
1265                  * event context, so post to ourselves
1266                  */
1267
1268                 iov.iov_base = rec->buf.data;
1269                 iov.iov_len = rec->buf.length;
1270
1271                 for (i=0; i<rec->num_fds; i++) {
1272                         fds[i] = rec->fds[i];
1273                 }
1274
1275                 ret = messaging_post_self(
1276                         msg_ctx, rec->src, rec->dest, rec->msg_type,
1277                         &iov, 1, fds, rec->num_fds);
1278                 if (ret == 0) {
1279                         return;
1280                 }
1281         }
1282
1283         /*
1284          * If the fd-array isn't used, just close it.
1285          */
1286         for (i=0; i < rec->num_fds; i++) {
1287                 int fd = rec->fds[i];
1288                 close(fd);
1289         }
1290         rec->num_fds = 0;
1291         rec->fds = NULL;
1292 }
1293
1294 static int mess_parent_dgm_cleanup(void *private_data);
1295 static void mess_parent_dgm_cleanup_done(struct tevent_req *req);
1296
1297 bool messaging_parent_dgm_cleanup_init(struct messaging_context *msg)
1298 {
1299         struct tevent_req *req;
1300
1301         req = background_job_send(
1302                 msg, msg->event_ctx, msg, NULL, 0,
1303                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1304                             60*15),
1305                 mess_parent_dgm_cleanup, msg);
1306         if (req == NULL) {
1307                 return false;
1308         }
1309         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1310         return true;
1311 }
1312
1313 static int mess_parent_dgm_cleanup(void *private_data)
1314 {
1315         int ret;
1316
1317         ret = messaging_dgm_wipe();
1318         DEBUG(10, ("messaging_dgm_wipe returned %s\n",
1319                    ret ? strerror(ret) : "ok"));
1320         return lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1321                            60*15);
1322 }
1323
1324 static void mess_parent_dgm_cleanup_done(struct tevent_req *req)
1325 {
1326         struct messaging_context *msg = tevent_req_callback_data(
1327                 req, struct messaging_context);
1328         NTSTATUS status;
1329
1330         status = background_job_recv(req);
1331         TALLOC_FREE(req);
1332         DEBUG(1, ("messaging dgm cleanup job ended with %s\n",
1333                   nt_errstr(status)));
1334
1335         req = background_job_send(
1336                 msg, msg->event_ctx, msg, NULL, 0,
1337                 lp_parm_int(-1, "messaging", "messaging dgm cleanup interval",
1338                             60*15),
1339                 mess_parent_dgm_cleanup, msg);
1340         if (req == NULL) {
1341                 DEBUG(1, ("background_job_send failed\n"));
1342                 return;
1343         }
1344         tevent_req_set_callback(req, mess_parent_dgm_cleanup_done, msg);
1345 }
1346
1347 int messaging_cleanup(struct messaging_context *msg_ctx, pid_t pid)
1348 {
1349         int ret;
1350
1351         if (pid == 0) {
1352                 ret = messaging_dgm_wipe();
1353         } else {
1354                 ret = messaging_dgm_cleanup(pid);
1355         }
1356
1357         return ret;
1358 }
1359
1360 struct tevent_context *messaging_tevent_context(
1361         struct messaging_context *msg_ctx)
1362 {
1363         return msg_ctx->event_ctx;
1364 }
1365
1366 struct server_id_db *messaging_names_db(struct messaging_context *msg_ctx)
1367 {
1368         return msg_ctx->names_db;
1369 }
1370
1371 /** @} **/