unix_msg: Use struct initializers
[obnox/samba/samba-obnox.git] / source3 / lib / unix_msg / unix_msg.c
1 /*
2  * Unix SMB/CIFS implementation.
3  * Copyright (C) Volker Lendecke 2013
4  *
5  * This program is free software; you can redistribute it and/or modify
6  * it under the terms of the GNU General Public License as published by
7  * the Free Software Foundation; either version 3 of the License, or
8  * (at your option) any later version.
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
13  * GNU General Public License for more details.
14  *
15  * You should have received a copy of the GNU General Public License
16  * along with this program.  If not, see <http://www.gnu.org/licenses/>.
17  */
18
19 #include "replace.h"
20 #include "unix_msg.h"
21 #include "system/select.h"
22 #include "system/time.h"
23 #include "system/network.h"
24 #include "dlinklist.h"
25 #include "pthreadpool/pthreadpool.h"
26 #include <fcntl.h>
27
28 /*
29  * This file implements two abstractions: The "unix_dgram" functions implement
30  * queueing for unix domain datagram sockets. You can send to a destination
31  * socket, and if that has no free space available, it will fall back to an
32  * anonymous socket that will poll for writability. "unix_dgram" expects the
33  * data size not to exceed the system limit.
34  *
35  * The "unix_msg" functions implement the fragmentation of large messages on
36  * top of "unix_dgram". This is what is exposed to the user of this API.
37  */
38
39 struct unix_dgram_msg {
40         struct unix_dgram_msg *prev, *next;
41
42         int sock;
43         ssize_t sent;
44         int sys_errno;
45         size_t buflen;
46         uint8_t buf[];
47 };
48
49 struct unix_dgram_send_queue {
50         struct unix_dgram_send_queue *prev, *next;
51         struct unix_dgram_ctx *ctx;
52         int sock;
53         struct unix_dgram_msg *msgs;
54         char path[];
55 };
56
57 struct unix_dgram_ctx {
58         int sock;
59         pid_t created_pid;
60         const struct poll_funcs *ev_funcs;
61         size_t max_msg;
62
63         void (*recv_callback)(struct unix_dgram_ctx *ctx,
64                               uint8_t *msg, size_t msg_len,
65                               void *private_data);
66         void *private_data;
67
68         struct poll_watch *sock_read_watch;
69         struct unix_dgram_send_queue *send_queues;
70
71         struct pthreadpool *send_pool;
72         struct poll_watch *pool_read_watch;
73
74         uint8_t *recv_buf;
75         char path[];
76 };
77
78 static ssize_t iov_buflen(const struct iovec *iov, int iovlen);
79 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
80                                     void *private_data);
81
82 /* Set socket non blocking. */
83 static int prepare_socket_nonblock(int sock)
84 {
85         int flags;
86 #ifdef O_NONBLOCK
87 #define FLAG_TO_SET O_NONBLOCK
88 #else
89 #ifdef SYSV
90 #define FLAG_TO_SET O_NDELAY
91 #else /* BSD */
92 #define FLAG_TO_SET FNDELAY
93 #endif
94 #endif
95
96         flags = fcntl(sock, F_GETFL);
97         if (flags == -1) {
98                 return errno;
99         }
100         flags |= FLAG_TO_SET;
101         if (fcntl(sock, F_SETFL, flags) == -1) {
102                 return errno;
103         }
104
105 #undef FLAG_TO_SET
106         return 0;
107 }
108
109 /* Set socket close on exec. */
110 static int prepare_socket_cloexec(int sock)
111 {
112 #ifdef FD_CLOEXEC
113         int flags;
114
115         flags = fcntl(sock, F_GETFD, 0);
116         if (flags == -1) {
117                 return errno;
118         }
119         flags |= FD_CLOEXEC;
120         if (fcntl(sock, F_SETFD, flags) == -1) {
121                 return errno;
122         }
123 #endif
124         return 0;
125 }
126
127 /* Set socket non blocking and close on exec. */
128 static int prepare_socket(int sock)
129 {
130         int ret = prepare_socket_nonblock(sock);
131
132         if (ret) {
133                 return ret;
134         }
135         return prepare_socket_cloexec(sock);
136 }
137
138 static int unix_dgram_init(const struct sockaddr_un *addr, size_t max_msg,
139                            const struct poll_funcs *ev_funcs,
140                            void (*recv_callback)(struct unix_dgram_ctx *ctx,
141                                                  uint8_t *msg, size_t msg_len,
142                                                  void *private_data),
143                            void *private_data,
144                            struct unix_dgram_ctx **result)
145 {
146         struct unix_dgram_ctx *ctx;
147         size_t pathlen;
148         int ret;
149
150         if (addr != NULL) {
151                 pathlen = strlen(addr->sun_path)+1;
152         } else {
153                 pathlen = 1;
154         }
155
156         ctx = malloc(offsetof(struct unix_dgram_ctx, path) + pathlen);
157         if (ctx == NULL) {
158                 return ENOMEM;
159         }
160         if (addr != NULL) {
161                 memcpy(ctx->path, addr->sun_path, pathlen);
162         } else {
163                 ctx->path[0] = '\0';
164         }
165
166         *ctx = (struct unix_dgram_ctx) {
167                 .max_msg = max_msg,
168                 .ev_funcs = ev_funcs,
169                 .recv_callback = recv_callback,
170                 .private_data = private_data,
171                 .created_pid = (pid_t)-1
172         };
173
174         ctx->recv_buf = malloc(max_msg);
175         if (ctx->recv_buf == NULL) {
176                 free(ctx);
177                 return ENOMEM;
178         }
179
180         ctx->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
181         if (ctx->sock == -1) {
182                 ret = errno;
183                 goto fail_free;
184         }
185
186         /* Set non-blocking and close-on-exec. */
187         ret = prepare_socket(ctx->sock);
188         if (ret != 0) {
189                 goto fail_close;
190         }
191
192         if (addr != NULL) {
193                 ret = bind(ctx->sock,
194                            (const struct sockaddr *)(const void *)addr,
195                            sizeof(*addr));
196                 if (ret == -1) {
197                         ret = errno;
198                         goto fail_close;
199                 }
200
201                 ctx->created_pid = getpid();
202
203                 ctx->sock_read_watch = ctx->ev_funcs->watch_new(
204                         ctx->ev_funcs, ctx->sock, POLLIN,
205                         unix_dgram_recv_handler, ctx);
206
207                 if (ctx->sock_read_watch == NULL) {
208                         ret = ENOMEM;
209                         goto fail_close;
210                 }
211         }
212
213         *result = ctx;
214         return 0;
215
216 fail_close:
217         close(ctx->sock);
218 fail_free:
219         free(ctx->recv_buf);
220         free(ctx);
221         return ret;
222 }
223
224 static void unix_dgram_recv_handler(struct poll_watch *w, int fd, short events,
225                                     void *private_data)
226 {
227         struct unix_dgram_ctx *ctx = (struct unix_dgram_ctx *)private_data;
228         ssize_t received;
229         struct msghdr msg;
230         struct iovec iov;
231
232         iov = (struct iovec) {
233                 .iov_base = (void *)ctx->recv_buf,
234                 .iov_len = ctx->max_msg,
235         };
236
237         msg = (struct msghdr) {
238                 .msg_iov = &iov,
239                 .msg_iovlen = 1,
240 #ifdef HAVE_STRUCT_MSGHDR_MSG_CONTROL
241                 .msg_control = NULL,
242                 .msg_controllen = 0,
243 #endif
244         };
245
246         received = recvmsg(fd, &msg, 0);
247         if (received == -1) {
248                 if ((errno == EAGAIN) ||
249 #ifdef EWOULDBLOCK
250                     (errno == EWOULDBLOCK) ||
251 #endif
252                     (errno == EINTR) || (errno == ENOMEM)) {
253                         /* Not really an error - just try again. */
254                         return;
255                 }
256                 /* Problem with the socket. Set it unreadable. */
257                 ctx->ev_funcs->watch_update(w, 0);
258                 return;
259         }
260         if (received > ctx->max_msg) {
261                 /* More than we expected, not for us */
262                 return;
263         }
264         ctx->recv_callback(ctx, ctx->recv_buf, received, ctx->private_data);
265 }
266
267 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
268                                     void *private_data);
269
270 static int unix_dgram_init_pthreadpool(struct unix_dgram_ctx *ctx)
271 {
272         int ret, signalfd;
273
274         if (ctx->send_pool != NULL) {
275                 return 0;
276         }
277
278         ret = pthreadpool_init(0, &ctx->send_pool);
279         if (ret != 0) {
280                 return ret;
281         }
282
283         signalfd = pthreadpool_signal_fd(ctx->send_pool);
284
285         ctx->pool_read_watch = ctx->ev_funcs->watch_new(
286                 ctx->ev_funcs, signalfd, POLLIN,
287                 unix_dgram_job_finished, ctx);
288         if (ctx->pool_read_watch == NULL) {
289                 pthreadpool_destroy(ctx->send_pool);
290                 ctx->send_pool = NULL;
291                 return ENOMEM;
292         }
293
294         return 0;
295 }
296
297 static int unix_dgram_send_queue_init(
298         struct unix_dgram_ctx *ctx, const struct sockaddr_un *dst,
299         struct unix_dgram_send_queue **result)
300 {
301         struct unix_dgram_send_queue *q;
302         size_t pathlen;
303         int ret, err;
304
305         pathlen = strlen(dst->sun_path)+1;
306
307         q = malloc(offsetof(struct unix_dgram_send_queue, path) + pathlen);
308         if (q == NULL) {
309                 return ENOMEM;
310         }
311         q->ctx = ctx;
312         q->msgs = NULL;
313         memcpy(q->path, dst->sun_path, pathlen);
314
315         q->sock = socket(AF_UNIX, SOCK_DGRAM, 0);
316         if (q->sock == -1) {
317                 err = errno;
318                 goto fail_free;
319         }
320
321         err = prepare_socket_cloexec(q->sock);
322         if (err != 0) {
323                 goto fail_close;
324         }
325
326         do {
327                 ret = connect(q->sock,
328                               (const struct sockaddr *)(const void *)dst,
329                               sizeof(*dst));
330         } while ((ret == -1) && (errno == EINTR));
331
332         if (ret == -1) {
333                 err = errno;
334                 goto fail_close;
335         }
336
337         err = unix_dgram_init_pthreadpool(ctx);
338         if (err != 0) {
339                 goto fail_close;
340         }
341
342         DLIST_ADD(ctx->send_queues, q);
343
344         *result = q;
345         return 0;
346
347 fail_close:
348         close(q->sock);
349 fail_free:
350         free(q);
351         return err;
352 }
353
354 static void unix_dgram_send_queue_free(struct unix_dgram_send_queue *q)
355 {
356         struct unix_dgram_ctx *ctx = q->ctx;
357
358         while (q->msgs != NULL) {
359                 struct unix_dgram_msg *msg;
360                 msg = q->msgs;
361                 DLIST_REMOVE(q->msgs, msg);
362                 free(msg);
363         }
364         close(q->sock);
365         DLIST_REMOVE(ctx->send_queues, q);
366         free(q);
367 }
368
369 static struct unix_dgram_send_queue *find_send_queue(
370         struct unix_dgram_ctx *ctx, const char *dst_sock)
371 {
372         struct unix_dgram_send_queue *s;
373
374         for (s = ctx->send_queues; s != NULL; s = s->next) {
375                 if (strcmp(s->path, dst_sock) == 0) {
376                         return s;
377                 }
378         }
379         return NULL;
380 }
381
382 static int queue_msg(struct unix_dgram_send_queue *q,
383                      const struct iovec *iov, int iovlen)
384 {
385         struct unix_dgram_msg *msg;
386         ssize_t buflen;
387         size_t msglen;
388         int i;
389
390         buflen = iov_buflen(iov, iovlen);
391         if (buflen == -1) {
392                 return EINVAL;
393         }
394
395         msglen = offsetof(struct unix_dgram_msg, buf) + buflen;
396         if ((msglen < buflen) ||
397             (msglen < offsetof(struct unix_dgram_msg, buf))) {
398                 /* overflow */
399                 return EINVAL;
400         }
401
402         msg = malloc(msglen);
403         if (msg == NULL) {
404                 return ENOMEM;
405         }
406         msg->buflen = buflen;
407         msg->sock = q->sock;
408
409         buflen = 0;
410         for (i=0; i<iovlen; i++) {
411                 memcpy(&msg->buf[buflen], iov[i].iov_base, iov[i].iov_len);
412                 buflen += iov[i].iov_len;
413         }
414
415         DLIST_ADD_END(q->msgs, msg, struct unix_dgram_msg);
416         return 0;
417 }
418
419 static void unix_dgram_send_job(void *private_data)
420 {
421         struct unix_dgram_msg *msg = private_data;
422
423         do {
424                 msg->sent = send(msg->sock, msg->buf, msg->buflen, 0);
425         } while ((msg->sent == -1) && (errno == EINTR));
426 }
427
428 static void unix_dgram_job_finished(struct poll_watch *w, int fd, short events,
429                                     void *private_data)
430 {
431         struct unix_dgram_ctx *ctx = private_data;
432         struct unix_dgram_send_queue *q;
433         struct unix_dgram_msg *msg;
434         int ret, job;
435
436         ret = pthreadpool_finished_jobs(ctx->send_pool, &job, 1);
437         if (ret != 1) {
438                 return;
439         }
440
441         for (q = ctx->send_queues; q != NULL; q = q->next) {
442                 if (job == q->sock) {
443                         break;
444                 }
445         }
446
447         if (q == NULL) {
448                 /* Huh? Should not happen */
449                 return;
450         }
451
452         msg = q->msgs;
453         DLIST_REMOVE(q->msgs, msg);
454         free(msg);
455
456         if (q->msgs != NULL) {
457                 ret = pthreadpool_add_job(ctx->send_pool, q->sock,
458                                           unix_dgram_send_job, q->msgs);
459                 if (ret == 0) {
460                         return;
461                 }
462         }
463
464         unix_dgram_send_queue_free(q);
465 }
466
467 static int unix_dgram_send(struct unix_dgram_ctx *ctx,
468                            const struct sockaddr_un *dst,
469                            const struct iovec *iov, int iovlen)
470 {
471         struct unix_dgram_send_queue *q;
472         struct msghdr msg;
473         int ret;
474
475         /*
476          * To preserve message ordering, we have to queue a message when
477          * others are waiting in line already.
478          */
479         q = find_send_queue(ctx, dst->sun_path);
480         if (q != NULL) {
481                 return queue_msg(q, iov, iovlen);
482         }
483
484         /*
485          * Try a cheap nonblocking send
486          */
487
488         msg = (struct msghdr) {
489                 .msg_name = discard_const_p(struct sockaddr_un, dst),
490                 .msg_namelen = sizeof(*dst),
491                 .msg_iov = discard_const_p(struct iovec, iov),
492                 .msg_iovlen = iovlen
493         };
494
495         ret = sendmsg(ctx->sock, &msg, 0);
496         if (ret >= 0) {
497                 return 0;
498         }
499 #ifdef EWOULDBLOCK
500         if ((errno != EWOULDBLOCK) && (errno != EAGAIN) && (errno != EINTR)) {
501 #else
502         if ((errno != EAGAIN) && (errno != EINTR)) {
503 #endif
504                 return errno;
505         }
506
507         ret = unix_dgram_send_queue_init(ctx, dst, &q);
508         if (ret != 0) {
509                 return ret;
510         }
511         ret = queue_msg(q, iov, iovlen);
512         if (ret != 0) {
513                 unix_dgram_send_queue_free(q);
514                 return ret;
515         }
516         ret = pthreadpool_add_job(ctx->send_pool, q->sock,
517                                   unix_dgram_send_job, q->msgs);
518         if (ret != 0) {
519                 unix_dgram_send_queue_free(q);
520                 return ret;
521         }
522         return 0;
523 }
524
525 static int unix_dgram_sock(struct unix_dgram_ctx *ctx)
526 {
527         return ctx->sock;
528 }
529
530 static int unix_dgram_free(struct unix_dgram_ctx *ctx)
531 {
532         if (ctx->send_queues != NULL) {
533                 return EBUSY;
534         }
535
536         if (ctx->send_pool != NULL) {
537                 int ret = pthreadpool_destroy(ctx->send_pool);
538                 if (ret != 0) {
539                         return ret;
540                 }
541                 ctx->ev_funcs->watch_free(ctx->pool_read_watch);
542         }
543
544         ctx->ev_funcs->watch_free(ctx->sock_read_watch);
545
546         if (getpid() == ctx->created_pid) {
547                 /* If we created it, unlink. Otherwise someone else might
548                  * still have it open */
549                 unlink(ctx->path);
550         }
551
552         close(ctx->sock);
553         free(ctx->recv_buf);
554         free(ctx);
555         return 0;
556 }
557
558 /*
559  * Every message starts with a uint64_t cookie.
560  *
561  * A value of 0 indicates a single-fragment message which is complete in
562  * itself. The data immediately follows the cookie.
563  *
564  * Every multi-fragment message has a cookie != 0 and starts with a cookie
565  * followed by a struct unix_msg_header and then the data. The pid and sock
566  * fields are used to assure uniqueness on the receiver side.
567  */
568
569 struct unix_msg_hdr {
570         size_t msglen;
571         pid_t pid;
572         int sock;
573 };
574
575 struct unix_msg {
576         struct unix_msg *prev, *next;
577         size_t msglen;
578         size_t received;
579         pid_t sender_pid;
580         int sender_sock;
581         uint64_t cookie;
582         uint8_t buf[1];
583 };
584
585 struct unix_msg_ctx {
586         struct unix_dgram_ctx *dgram;
587         size_t fragment_len;
588         uint64_t cookie;
589
590         void (*recv_callback)(struct unix_msg_ctx *ctx,
591                               uint8_t *msg, size_t msg_len,
592                               void *private_data);
593         void *private_data;
594
595         struct unix_msg *msgs;
596 };
597
598 static void unix_msg_recv(struct unix_dgram_ctx *ctx,
599                           uint8_t *msg, size_t msg_len,
600                           void *private_data);
601
602 int unix_msg_init(const struct sockaddr_un *addr,
603                   const struct poll_funcs *ev_funcs,
604                   size_t fragment_len, uint64_t cookie,
605                   void (*recv_callback)(struct unix_msg_ctx *ctx,
606                                         uint8_t *msg, size_t msg_len,
607                                         void *private_data),
608                   void *private_data,
609                   struct unix_msg_ctx **result)
610 {
611         struct unix_msg_ctx *ctx;
612         int ret;
613
614         ctx = malloc(sizeof(*ctx));
615         if (ctx == NULL) {
616                 return ENOMEM;
617         }
618
619         *ctx = (struct unix_msg_ctx) {
620                 .fragment_len = fragment_len,
621                 .cookie = cookie,
622                 .recv_callback = recv_callback,
623                 .private_data = private_data
624         };
625
626         ret = unix_dgram_init(addr, fragment_len, ev_funcs,
627                               unix_msg_recv, ctx, &ctx->dgram);
628         if (ret != 0) {
629                 free(ctx);
630                 return ret;
631         }
632
633         *result = ctx;
634         return 0;
635 }
636
637 int unix_msg_send(struct unix_msg_ctx *ctx, const struct sockaddr_un *dst,
638                   const struct iovec *iov, int iovlen)
639 {
640         ssize_t msglen;
641         size_t sent;
642         int ret = 0;
643         struct iovec *iov_copy;
644         struct unix_msg_hdr hdr;
645         struct iovec src_iov;
646
647         if (iovlen < 0) {
648                 return EINVAL;
649         }
650
651         msglen = iov_buflen(iov, iovlen);
652         if (msglen == -1) {
653                 return EINVAL;
654         }
655
656         if (msglen <= (ctx->fragment_len - sizeof(uint64_t))) {
657                 struct iovec tmp_iov[iovlen+1];
658                 uint64_t cookie = 0;
659
660                 tmp_iov[0].iov_base = &cookie;
661                 tmp_iov[0].iov_len = sizeof(cookie);
662                 if (iovlen > 0) {
663                         memcpy(&tmp_iov[1], iov,
664                                sizeof(struct iovec) * iovlen);
665                 }
666
667                 return unix_dgram_send(ctx->dgram, dst, tmp_iov, iovlen+1);
668         }
669
670         hdr = (struct unix_msg_hdr) {
671                 .msglen = msglen,
672                 .pid = getpid(),
673                 .sock = unix_dgram_sock(ctx->dgram)
674         };
675
676         iov_copy = malloc(sizeof(struct iovec) * (iovlen + 2));
677         if (iov_copy == NULL) {
678                 return ENOMEM;
679         }
680         iov_copy[0].iov_base = &ctx->cookie;
681         iov_copy[0].iov_len = sizeof(ctx->cookie);
682         iov_copy[1].iov_base = &hdr;
683         iov_copy[1].iov_len = sizeof(hdr);
684
685         sent = 0;
686         src_iov = iov[0];
687
688         /*
689          * The following write loop sends the user message in pieces. We have
690          * filled the first two iovecs above with "cookie" and "hdr". In the
691          * following loops we pull message chunks from the user iov array and
692          * fill iov_copy piece by piece, possibly truncating chunks from the
693          * caller's iov array. Ugly, but hopefully efficient.
694          */
695
696         while (sent < msglen) {
697                 size_t fragment_len;
698                 size_t iov_index = 2;
699
700                 fragment_len = sizeof(ctx->cookie) + sizeof(hdr);
701
702                 while (fragment_len < ctx->fragment_len) {
703                         size_t space, chunk;
704
705                         space = ctx->fragment_len - fragment_len;
706                         chunk = MIN(space, src_iov.iov_len);
707
708                         iov_copy[iov_index].iov_base = src_iov.iov_base;
709                         iov_copy[iov_index].iov_len = chunk;
710                         iov_index += 1;
711
712                         src_iov.iov_base = (char *)src_iov.iov_base + chunk;
713                         src_iov.iov_len -= chunk;
714                         fragment_len += chunk;
715
716                         if (src_iov.iov_len == 0) {
717                                 iov += 1;
718                                 iovlen -= 1;
719                                 if (iovlen == 0) {
720                                         break;
721                                 }
722                                 src_iov = iov[0];
723                         }
724                 }
725                 sent += (fragment_len - sizeof(ctx->cookie) - sizeof(hdr));
726
727                 ret = unix_dgram_send(ctx->dgram, dst, iov_copy, iov_index);
728                 if (ret != 0) {
729                         break;
730                 }
731         }
732
733         free(iov_copy);
734
735         ctx->cookie += 1;
736         if (ctx->cookie == 0) {
737                 ctx->cookie += 1;
738         }
739
740         return ret;
741 }
742
743 static void unix_msg_recv(struct unix_dgram_ctx *dgram_ctx,
744                           uint8_t *buf, size_t buflen,
745                           void *private_data)
746 {
747         struct unix_msg_ctx *ctx = (struct unix_msg_ctx *)private_data;
748         struct unix_msg_hdr hdr;
749         struct unix_msg *msg;
750         size_t space;
751         uint64_t cookie;
752
753         if (buflen < sizeof(cookie)) {
754                 return;
755         }
756         memcpy(&cookie, buf, sizeof(cookie));
757
758         buf += sizeof(cookie);
759         buflen -= sizeof(cookie);
760
761         if (cookie == 0) {
762                 ctx->recv_callback(ctx, buf, buflen, ctx->private_data);
763                 return;
764         }
765
766         if (buflen < sizeof(hdr)) {
767                 return;
768         }
769         memcpy(&hdr, buf, sizeof(hdr));
770
771         buf += sizeof(hdr);
772         buflen -= sizeof(hdr);
773
774         for (msg = ctx->msgs; msg != NULL; msg = msg->next) {
775                 if ((msg->sender_pid == hdr.pid) &&
776                     (msg->sender_sock == hdr.sock)) {
777                         break;
778                 }
779         }
780
781         if ((msg != NULL) && (msg->cookie != cookie)) {
782                 DLIST_REMOVE(ctx->msgs, msg);
783                 free(msg);
784                 msg = NULL;
785         }
786
787         if (msg == NULL) {
788                 msg = malloc(offsetof(struct unix_msg, buf) + hdr.msglen);
789                 if (msg == NULL) {
790                         return;
791                 }
792                 *msg = (struct unix_msg) {
793                         .msglen = hdr.msglen,
794                         .sender_pid = hdr.pid,
795                         .sender_sock = hdr.sock,
796                         .cookie = cookie
797                 };
798                 DLIST_ADD(ctx->msgs, msg);
799         }
800
801         space = msg->msglen - msg->received;
802         if (buflen > space) {
803                 return;
804         }
805
806         memcpy(msg->buf + msg->received, buf, buflen);
807         msg->received += buflen;
808
809         if (msg->received < msg->msglen) {
810                 return;
811         }
812
813         DLIST_REMOVE(ctx->msgs, msg);
814         ctx->recv_callback(ctx, msg->buf, msg->msglen, ctx->private_data);
815         free(msg);
816 }
817
818 int unix_msg_free(struct unix_msg_ctx *ctx)
819 {
820         int ret;
821
822         ret = unix_dgram_free(ctx->dgram);
823         if (ret != 0) {
824                 return ret;
825         }
826
827         while (ctx->msgs != NULL) {
828                 struct unix_msg *msg = ctx->msgs;
829                 DLIST_REMOVE(ctx->msgs, msg);
830                 free(msg);
831         }
832
833         free(ctx);
834         return 0;
835 }
836
837 static ssize_t iov_buflen(const struct iovec *iov, int iovlen)
838 {
839         size_t buflen = 0;
840         int i;
841
842         for (i=0; i<iovlen; i++) {
843                 size_t thislen = iov[i].iov_len;
844                 size_t tmp = buflen + thislen;
845
846                 if ((tmp < buflen) || (tmp < thislen)) {
847                         /* overflow */
848                         return -1;
849                 }
850                 buflen = tmp;
851         }
852         return buflen;
853 }