s3:unix_msg: pass the fd array to the unix_dgram recv_callback function
[obnox/samba/samba-obnox.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include <fcntl.h>
27
28 /*
29  * This file implements two abstractions: The "unix_dgram" functions implement
30  * queueing for unix domain datagram sockets. You can send to a destination
31  * socket, and if that has no free space available, it will fall back to an
32  * anonymous socket that will poll for writability. "unix_dgram" expects the
33  * data size not to exceed the system limit.
34  *
35  * The "unix_msg" functions implement the fragmentation of large messages on
36  * top of "unix_dgram". This is what is exposed to the user of this API.
37  */
38
39 struct unix_dgram_msg {
40         struct unix_dgram_msg *prev, *next;
41
42         int sock;
43         ssize_t sent;
44         int sys_errno;
45         size_t buflen;
46         uint8_t buf[];
47 };
48
49 struct unix_dgram_send_queue {
50         struct unix_dgram_send_queue *prev, *next;
51         struct unix_dgram_ctx *ctx;
52         int sock;
53         struct unix_dgram_msg *msgs;
54         char path[];
55 };
56
57 struct unix_dgram_ctx {
58         int sock;
59         pid_t created_pid;
60         const struct poll_funcs *ev_funcs;
61         size_t max_msg;
62
63         void (*recv_callback)(struct unix_dgram_ctx *ctx,
64                               uint8_t *msg, size_t msg_len,
65                               int *fds, size_t num_fds,
66                               void *private_data);
67         void *private_data;
68
69         struct poll_watch *sock_read_watch;
70         struct unix_dgram_send_queue *send_queues;
71
72         struct pthreadpool *send_pool;
73         struct poll_watch *pool_read_watch;
74
75         uint8_t *recv_buf;
76         char path[];
77 };
78
79 static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
80 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
81                                     void *private_data);
82
83 /* Set socket non blocking. */
84 static int prepare_socket_nonblock(int sock)
85 {
86         int flags;
87 #ifdef O_NONBLOCK
88 #define FLAG_TO_SET O_NONBLOCK
89 #else
90 #ifdef SYSV
91 #define FLAG_TO_SET O_NDELAY
92 #else /* BSD */
93 #define FLAG_TO_SET FNDELAY
94 #endif
95 #endif
96
97         flags = fcntl(sock, F_GETFL);
98         if (flags == -1) {
99                 return errno;
100         }
101         flags |= FLAG_TO_SET;
102         if (fcntl(sock, F_SETFL, flags) == -1) {
103                 return errno;
104         }
105
106 #undef FLAG_TO_SET
107         return 0;
108 }
109
110 /* Set socket close on exec. */
111 static int prepare_socket_cloexec(int sock)
112 {
113 #ifdef FD_CLOEXEC
114         int flags;
115
116         flags = fcntl(sock, F_GETFD, 0);
117         if (flags == -1) {
118                 return errno;
119         }
120         flags |= FD_CLOEXEC;
121         if (fcntl(sock, F_SETFD, flags) == -1) {
122                 return errno;
123         }
124 #endif
125         return 0;
126 }
127
128 /* Set socket non blocking and close on exec. */
129 static int prepare_socket(int sock)
130 {
131         int ret = prepare_socket_nonblock(sock);
132
133         if (ret) {
134                 return ret;
135         }
136         return prepare_socket_cloexec(sock);
137 }
138
139 static void close_fd_array(int *fds, size_t num_fds)
140 {
141         size_t i;
142
143         for (i = 0; i < num_fds; i++) {
144                 if (fds[i] == -1) {
145                         continue;
146                 }
147
148                 close(fds[i]);
149                 fds[i] = -1;
150         }
151 }
152
153 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
154                            const struct poll_funcs *ev_funcs,
155                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
156                                                  uint8_t *msg, size_t msg_len,
157                                                  int *fds, size_t num_fds,
158                                                  void *private_data),
159                            void *private_data,
160                            struct unix_dgram_ctx **result)
161 {
162         struct unix_dgram_ctx *ctx;
163         size_t pathlen;
164         int ret;
165
166         if (addr != NULL) {
167                 pathlen = strlen(addr->sun_path)+1;
168         } else {
169                 pathlen = 1;
170         }
171
172         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
173         if (ctx == NULL) {
174                 return ENOMEM;
175         }
176         if (addr != NULL) {
177                 memcpy(ctx->path, addr->sun_path, pathlen);
178         } else {
179                 ctx->path[0] = '\0';
180         }
181
182         *ctx = (struct unix_dgram_ctx) {
183                 .max_msg = max_msg,
184                 .ev_funcs = ev_funcs,
185                 .recv_callback = recv_callback,
186                 .private_data = private_data,
187                 .created_pid = (pid_t)-1
188         };
189
190         ctx->recv_buf = malloc(max_msg);
191         if (ctx->recv_buf == NULL) {
192                 free(ctx);
193                 return ENOMEM;
194         }
195
196         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
197         if (ctx->sock == -1) {
198                 ret = errno;
199                 goto fail_free;
200         }
201
202         /* Set non-blocking and close-on-exec. */
203         ret = prepare_socket(ctx->sock);
204         if (ret != 0) {
205                 goto fail_close;
206         }
207
208         if (addr != NULL) {
209                 ret = bind(ctx->sock,
210                            (const struct sockaddr *)(const void *)addr,
211                            sizeof(*addr));
212                 if (ret == -1) {
213                         ret = errno;
214                         goto fail_close;
215                 }
216
217                 ctx->created_pid = getpid();
218
219                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
220                         ctx->ev_funcs, ctx->sock, POLLIN,
221                         unix_dgram_recv_handler, ctx);
222
223                 if (ctx->sock_read_watch == NULL) {
224                         ret = ENOMEM;
225                         goto fail_close;
226                 }
227         }
228
229         *result = ctx;
230         return 0;
231
232 fail_close:
233         close(ctx->sock);
234 fail_free:
235         free(ctx->recv_buf);
236         free(ctx);
237         return ret;
238 }
239
240 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
241                                     void *private_data)
242 {
243         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
244         ssize_t received;
245         int flags = 0;
246         struct msghdr msg;
247         struct iovec iov;
248 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
249         char buf[CMSG_SPACE(sizeof(int)*INT8_MAX)] = { 0, };
250         struct cmsghdr *cmsg;
251 #endif /* HAVE_STRUCT_MSGHDR_MSG_CONTROL */
252         int *fds = NULL;
253         size_t i, num_fds = 0;
254
255         iov = (struct iovec) {
256                 .iov_base = (void *)ctx->recv_buf,
257                 .iov_len = ctx->max_msg,
258         };
259
260         msg = (struct msghdr) {
261                 .msg_iov = &iov,
262                 .msg_iovlen = 1,
263 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
264                 .msg_control = buf,
265                 .msg_controllen = sizeof(buf),
266 #endif
267         };
268
269 #ifdef MSG_CMSG_CLOEXEC
270         flags |= MSG_CMSG_CLOEXEC;
271 #endif
272
273         received = recvmsg(fd, &msg, flags);
274         if (received == -1) {
275                 if ((errno == EAGAIN) ||
276                     (errno == EWOULDBLOCK) ||
277                     (errno == EINTR) || (errno == ENOMEM)) {
278                         /* Not really an error - just try again. */
279                         return;
280                 }
281                 /* Problem with the socket. Set it unreadable. */
282                 ctx->ev_funcs->watch_update(w, 0);
283                 return;
284         }
285         if (received > ctx->max_msg) {
286                 /* More than we expected, not for us */
287                 return;
288         }
289
290 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
291         for(cmsg = CMSG_FIRSTHDR(&msg); cmsg != NULL;
292             cmsg = CMSG_NXTHDR(&msg, cmsg))
293         {
294                 void *data = CMSG_DATA(cmsg);
295
296                 if (cmsg->cmsg_type != SCM_RIGHTS) {
297                         continue;
298                 }
299                 if (cmsg->cmsg_level != SOL_SOCKET) {
300                         continue;
301                 }
302
303                 fds = (int *)data;
304                 num_fds = (cmsg->cmsg_len - CMSG_LEN(0)) / sizeof (int);
305                 break;
306         }
307 #endif
308
309         for (i = 0; i < num_fds; i++) {
310                 int err;
311
312                 err = prepare_socket_cloexec(fds[i]);
313                 if (err != 0) {
314                         goto cleanup_fds;
315                 }
316         }
317
318         ctx->recv_callback(ctx, ctx->recv_buf, received,
319                            fds, num_fds, ctx->private_data);
320         return;
321
322 cleanup_fds:
323         close_fd_array(fds, num_fds);
324
325         ctx->recv_callback(ctx, ctx->recv_buf, received,
326                            NULL, 0, ctx->private_data);
327 }
328
329 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
330                                     void *private_data);
331
332 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
333 {
334         int ret, signalfd;
335
336         if (ctx->send_pool != NULL) {
337                 return 0;
338         }
339
340         ret = pthreadpool_init(0, &ctx->send_pool);
341         if (ret != 0) {
342                 return ret;
343         }
344
345         signalfd = pthreadpool_signal_fd(ctx->send_pool);
346
347         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
348                 ctx->ev_funcs, signalfd, POLLIN,
349                 unix_dgram_job_finished, ctx);
350         if (ctx->pool_read_watch == NULL) {
351                 pthreadpool_destroy(ctx->send_pool);
352                 ctx->send_pool = NULL;
353                 return ENOMEM;
354         }
355
356         return 0;
357 }
358
359 static int unix_dgram_send_queue_init(
360         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
361         struct unix_dgram_send_queue **result)
362 {
363         struct unix_dgram_send_queue *q;
364         size_t pathlen;
365         int ret, err;
366
367         pathlen = strlen(dst->sun_path)+1;
368
369         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
370         if (q == NULL) {
371                 return ENOMEM;
372         }
373         q->ctx = ctx;
374         q->msgs = NULL;
375         memcpy(q->path, dst->sun_path, pathlen);
376
377         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
378         if (q->sock == -1) {
379                 err = errno;
380                 goto fail_free;
381         }
382
383         err = prepare_socket_cloexec(q->sock);
384         if (err != 0) {
385                 goto fail_close;
386         }
387
388         do {
389                 ret = connect(q->sock,
390                               (const struct sockaddr *)(const void *)dst,
391                               sizeof(*dst));
392         } while ((ret == -1) && (errno == EINTR));
393
394         if (ret == -1) {
395                 err = errno;
396                 goto fail_close;
397         }
398
399         err = unix_dgram_init_pthreadpool(ctx);
400         if (err != 0) {
401                 goto fail_close;
402         }
403
404         DLIST_ADD(ctx->send_queues, q);
405
406         *result = q;
407         return 0;
408
409 fail_close:
410         close(q->sock);
411 fail_free:
412         free(q);
413         return err;
414 }
415
416 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
417 {
418         struct unix_dgram_ctx *ctx = q->ctx;
419
420         while (q->msgs != NULL) {
421                 struct unix_dgram_msg *msg;
422                 msg = q->msgs;
423                 DLIST_REMOVE(q->msgs, msg);
424                 free(msg);
425         }
426         close(q->sock);
427         DLIST_REMOVE(ctx->send_queues, q);
428         free(q);
429 }
430
431 static struct unix_dgram_send_queue *find_send_queue(
432         struct unix_dgram_ctx *ctx, const char *dst_sock)
433 {
434         struct unix_dgram_send_queue *s;
435
436         for (s = ctx->send_queues; s != NULL; s = s->next) {
437                 if (strcmp(s->path, dst_sock) == 0) {
438                         return s;
439                 }
440         }
441         return NULL;
442 }
443
444 static int queue_msg(struct unix_dgram_send_queue *q,
445                      const struct iovec *iov, int iovlen)
446 {
447         struct unix_dgram_msg *msg;
448         ssize_t buflen;
449         size_t msglen;
450         int i;
451
452         buflen = iov_buflen(iov, iovlen);
453         if (buflen == -1) {
454                 return EINVAL;
455         }
456
457         msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
458         if ((msglen < buflen) ||
459             (msglen < offsetof(struct unix_dgram_msg, buf))) {
460                 /* overflow */
461                 return EINVAL;
462         }
463
464         msg = malloc(msglen);
465         if (msg == NULL) {
466                 return ENOMEM;
467         }
468         msg->buflen = buflen;
469         msg->sock = q->sock;
470
471         buflen = 0;
472         for (i=0; i<iovlen; i++) {
473                 memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
474                 buflen += iov[i].iov_len;
475         }
476
477         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
478         return 0;
479 }
480
481 static void unix_dgram_send_job(void *private_data)
482 {
483         struct unix_dgram_msg *dmsg = private_data;
484         struct iovec iov = {
485                 .iov_base = (void *)dmsg->buf,
486                 .iov_len = dmsg->buflen,
487         };
488         struct msghdr msg = {
489                 .msg_iov = &iov,
490                 .msg_iovlen = 1,
491         };
492
493         do {
494                 dmsg->sent = sendmsg(dmsg->sock, &msg, 0);
495         } while ((dmsg->sent == -1) && (errno == EINTR));
496 }
497
498 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
499                                     void *private_data)
500 {
501         struct unix_dgram_ctx *ctx = private_data;
502         struct unix_dgram_send_queue *q;
503         struct unix_dgram_msg *msg;
504         int ret, job;
505
506         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
507         if (ret != 1) {
508                 return;
509         }
510
511         for (q = ctx->send_queues; q != NULL; q = q->next) {
512                 if (job == q->sock) {
513                         break;
514                 }
515         }
516
517         if (q == NULL) {
518                 /* Huh? Should not happen */
519                 return;
520         }
521
522         msg = q->msgs;
523         DLIST_REMOVE(q->msgs, msg);
524         free(msg);
525
526         if (q->msgs != NULL) {
527                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
528                                           unix_dgram_send_job, q->msgs);
529                 if (ret == 0) {
530                         return;
531                 }
532         }
533
534         unix_dgram_send_queue_free(q);
535 }
536
537 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
538                            const struct sockaddr_un *dst,
539                            const struct iovec *iov, int iovlen)
540 {
541         struct unix_dgram_send_queue *q;
542         struct msghdr msg;
543         int ret;
544
545         /*
546          * To preserve message ordering, we have to queue a message when
547          * others are waiting in line already.
548          */
549         q = find_send_queue(ctx, dst->sun_path);
550         if (q != NULL) {
551                 return queue_msg(q, iov, iovlen);
552         }
553
554         /*
555          * Try a cheap nonblocking send
556          */
557
558         msg = (struct msghdr) {
559                 .msg_name = discard_const_p(struct sockaddr_un, dst),
560                 .msg_namelen = sizeof(*dst),
561                 .msg_iov = discard_const_p(struct iovec, iov),
562                 .msg_iovlen = iovlen
563         };
564
565         ret = sendmsg(ctx->sock, &msg, 0);
566         if (ret >= 0) {
567                 return 0;
568         }
569         if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
570                 return errno;
571         }
572
573         ret = unix_dgram_send_queue_init(ctx, dst, &q);
574         if (ret != 0) {
575                 return ret;
576         }
577         ret = queue_msg(q, iov, iovlen);
578         if (ret != 0) {
579                 unix_dgram_send_queue_free(q);
580                 return ret;
581         }
582         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
583                                   unix_dgram_send_job, q->msgs);
584         if (ret != 0) {
585                 unix_dgram_send_queue_free(q);
586                 return ret;
587         }
588         return 0;
589 }
590
591 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
592 {
593         return ctx->sock;
594 }
595
596 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
597 {
598         if (ctx->send_queues != NULL) {
599                 return EBUSY;
600         }
601
602         if (ctx->send_pool != NULL) {
603                 int ret = pthreadpool_destroy(ctx->send_pool);
604                 if (ret != 0) {
605                         return ret;
606                 }
607                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
608         }
609
610         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
611
612         if (getpid() == ctx->created_pid) {
613                 /* If we created it, unlink. Otherwise someone else might
614                  * still have it open */
615                 unlink(ctx->path);
616         }
617
618         close(ctx->sock);
619         free(ctx->recv_buf);
620         free(ctx);
621         return 0;
622 }
623
624 /*
625  * Every message starts with a uint64_t cookie.
626  *
627  * A value of 0 indicates a single-fragment message which is complete in
628  * itself. The data immediately follows the cookie.
629  *
630  * Every multi-fragment message has a cookie != 0 and starts with a cookie
631  * followed by a struct unix_msg_header and then the data. The pid and sock
632  * fields are used to assure uniqueness on the receiver side.
633  */
634
635 struct unix_msg_hdr {
636         size_t msglen;
637         pid_t pid;
638         int sock;
639 };
640
641 struct unix_msg {
642         struct unix_msg *prev, *next;
643         size_t msglen;
644         size_t received;
645         pid_t sender_pid;
646         int sender_sock;
647         uint64_t cookie;
648         uint8_t buf[1];
649 };
650
651 struct unix_msg_ctx {
652         struct unix_dgram_ctx *dgram;
653         size_t fragment_len;
654         uint64_t cookie;
655
656         void (*recv_callback)(struct unix_msg_ctx *ctx,
657                               uint8_t *msg, size_t msg_len,
658                               void *private_data);
659         void *private_data;
660
661         struct unix_msg *msgs;
662 };
663
664 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
665                           uint8_t *buf, size_t buflen,
666                           int *fds, size_t num_fds,
667                           void *private_data);
668
669 int unix_msg_init(const struct sockaddr_un *addr,
670                   const struct poll_funcs *ev_funcs,
671                   size_t fragment_len, uint64_t cookie,
672                   void (*recv_callback)(struct unix_msg_ctx *ctx,
673                                         uint8_t *msg, size_t msg_len,
674                                         void *private_data),
675                   void *private_data,
676                   struct unix_msg_ctx **result)
677 {
678         struct unix_msg_ctx *ctx;
679         int ret;
680
681         ctx = malloc(sizeof(*ctx));
682         if (ctx == NULL) {
683                 return ENOMEM;
684         }
685
686         *ctx = (struct unix_msg_ctx) {
687                 .fragment_len = fragment_len,
688                 .cookie = cookie,
689                 .recv_callback = recv_callback,
690                 .private_data = private_data
691         };
692
693         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
694                               unix_msg_recv, ctx, &ctx->dgram);
695         if (ret != 0) {
696                 free(ctx);
697                 return ret;
698         }
699
700         *result = ctx;
701         return 0;
702 }
703
704 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
705                   const struct iovec *iov, int iovlen)
706 {
707         ssize_t msglen;
708         size_t sent;
709         int ret = 0;
710         struct iovec iov_copy[iovlen+2];
711         struct unix_msg_hdr hdr;
712         struct iovec src_iov;
713
714         if (iovlen < 0) {
715                 return EINVAL;
716         }
717
718         msglen = iov_buflen(iov, iovlen);
719         if (msglen == -1) {
720                 return EINVAL;
721         }
722
723         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
724                 uint64_t cookie = 0;
725
726                 iov_copy[0].iov_base = &cookie;
727                 iov_copy[0].iov_len = sizeof(cookie);
728                 if (iovlen > 0) {
729                         memcpy(&iov_copy[1], iov,
730                                sizeof(struct iovec) * iovlen);
731                 }
732
733                 return unix_dgram_send(ctx->dgram, dst, iov_copy, iovlen+1);
734         }
735
736         hdr = (struct unix_msg_hdr) {
737                 .msglen = msglen,
738                 .pid = getpid(),
739                 .sock = unix_dgram_sock(ctx->dgram)
740         };
741
742         iov_copy[0].iov_base = &ctx->cookie;
743         iov_copy[0].iov_len = sizeof(ctx->cookie);
744         iov_copy[1].iov_base = &hdr;
745         iov_copy[1].iov_len = sizeof(hdr);
746
747         sent = 0;
748         src_iov = iov[0];
749
750         /*
751          * The following write loop sends the user message in pieces. We have
752          * filled the first two iovecs above with "cookie" and "hdr". In the
753          * following loops we pull message chunks from the user iov array and
754          * fill iov_copy piece by piece, possibly truncating chunks from the
755          * caller's iov array. Ugly, but hopefully efficient.
756          */
757
758         while (sent < msglen) {
759                 size_t fragment_len;
760                 size_t iov_index = 2;
761
762                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
763
764                 while (fragment_len < ctx->fragment_len) {
765                         size_t space, chunk;
766
767                         space = ctx->fragment_len - fragment_len;
768                         chunk = MIN(space, src_iov.iov_len);
769
770                         iov_copy[iov_index].iov_base = src_iov.iov_base;
771                         iov_copy[iov_index].iov_len = chunk;
772                         iov_index += 1;
773
774                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
775                         src_iov.iov_len -= chunk;
776                         fragment_len += chunk;
777
778                         if (src_iov.iov_len == 0) {
779                                 iov += 1;
780                                 iovlen -= 1;
781                                 if (iovlen == 0) {
782                                         break;
783                                 }
784                                 src_iov = iov[0];
785                         }
786                 }
787                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
788
789                 ret = unix_dgram_send(ctx->dgram, dst, iov_copy, iov_index);
790                 if (ret != 0) {
791                         break;
792                 }
793         }
794
795         ctx->cookie += 1;
796         if (ctx->cookie == 0) {
797                 ctx->cookie += 1;
798         }
799
800         return ret;
801 }
802
803 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
804                           uint8_t *buf, size_t buflen,
805                           int *fds, size_t num_fds,
806                           void *private_data)
807 {
808         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
809         struct unix_msg_hdr hdr;
810         struct unix_msg *msg;
811         size_t space;
812         uint64_t cookie;
813
814         /* for now we ignore passed file descriptors */
815         close_fd_array(fds, num_fds);
816
817         if (buflen < sizeof(cookie)) {
818                 return;
819         }
820         memcpy(&cookie, buf, sizeof(cookie));
821
822         buf += sizeof(cookie);
823         buflen -= sizeof(cookie);
824
825         if (cookie == 0) {
826                 ctx->recv_callback(ctx, buf, buflen, ctx->private_data);
827                 return;
828         }
829
830         if (buflen < sizeof(hdr)) {
831                 return;
832         }
833         memcpy(&hdr, buf, sizeof(hdr));
834
835         buf += sizeof(hdr);
836         buflen -= sizeof(hdr);
837
838         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
839                 if ((msg->sender_pid == hdr.pid) &&
840                     (msg->sender_sock == hdr.sock)) {
841                         break;
842                 }
843         }
844
845         if ((msg != NULL) && (msg->cookie != cookie)) {
846                 DLIST_REMOVE(ctx->msgs, msg);
847                 free(msg);
848                 msg = NULL;
849         }
850
851         if (msg == NULL) {
852                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
853                 if (msg == NULL) {
854                         return;
855                 }
856                 *msg = (struct unix_msg) {
857                         .msglen = hdr.msglen,
858                         .sender_pid = hdr.pid,
859                         .sender_sock = hdr.sock,
860                         .cookie = cookie
861                 };
862                 DLIST_ADD(ctx->msgs, msg);
863         }
864
865         space = msg->msglen - msg->received;
866         if (buflen > space) {
867                 return;
868         }
869
870         memcpy(msg->buf + msg->received, buf, buflen);
871         msg->received += buflen;
872
873         if (msg->received < msg->msglen) {
874                 return;
875         }
876
877         DLIST_REMOVE(ctx->msgs, msg);
878         ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
879         free(msg);
880 }
881
882 int unix_msg_free(struct unix_msg_ctx *ctx)
883 {
884         int ret;
885
886         ret = unix_dgram_free(ctx->dgram);
887         if (ret != 0) {
888                 return ret;
889         }
890
891         while (ctx->msgs != NULL) {
892                 struct unix_msg *msg = ctx->msgs;
893                 DLIST_REMOVE(ctx->msgs, msg);
894                 free(msg);
895         }
896
897         free(ctx);
898         return 0;
899 }
900
901 static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
902 {
903         size_t buflen = 0;
904         int i;
905
906         for (i=0; i<iovlen; i++) {
907                 size_t thislen = iov[i].iov_len;
908                 size_t tmp = buflen + thislen;
909
910                 if ((tmp < buflen) || (tmp < thislen)) {
911                         /* overflow */
912                         return -1;
913                 }
914                 buflen = tmp;
915         }
916         return buflen;
917 }