2 Generic Unix-domain Socket I/O
4 Copyright (C) Amitay Isaacs 2016
6 This program is free software; you can redistribute it and/or modify
7 it under the terms of the GNU General Public License as published by
8 the Free Software Foundation; either version 3 of the License, or
9 (at your option) any later version.
11 This program is distributed in the hope that it will be useful,
12 but WITHOUT ANY WARRANTY; without even the implied warranty of
13 MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 GNU General Public License for more details.
16 You should have received a copy of the GNU General Public License
17 along with this program; if not, see <http://www.gnu.org/licenses/>.
21 #include "system/filesys.h"
22 #include "system/network.h"
27 #include "lib/util/sys_rw.h"
28 #include "lib/util/debug.h"
29 #include "lib/util/blocking.h"
31 #include "common/logging.h"
32 #include "common/sock_io.h"
34 int sock_connect(const char *sockpath)
36 struct sockaddr_un addr;
40 if (sockpath == NULL) {
41 D_ERR("Invalid socket path\n");
45 memset(&addr, 0, sizeof(addr));
46 addr.sun_family = AF_UNIX;
47 len = strlcpy(addr.sun_path, sockpath, sizeof(addr.sun_path));
48 if (len >= sizeof(addr.sun_path)) {
49 D_ERR("Socket path too long, len=%zu\n", strlen(sockpath));
53 fd = socket(AF_UNIX, SOCK_STREAM, 0);
55 D_ERR("socket() failed, errno=%d\n", errno);
59 ret = connect(fd, (struct sockaddr *)&addr, sizeof(addr));
61 D_ERR("connect() failed, errno=%d\n", errno);
70 struct tevent_context *ev;
71 sock_queue_callback_fn_t callback;
75 struct tevent_immediate *im;
76 struct tevent_queue *queue;
77 struct tevent_fd *fde;
79 size_t buflen, begin, end;
82 static bool sock_queue_set_fd(struct sock_queue *queue, int fd);
83 static int sock_queue_destructor(struct sock_queue *queue);
84 static void sock_queue_handler(struct tevent_context *ev,
85 struct tevent_fd *fde, uint16_t flags,
87 static void sock_queue_process(struct sock_queue *queue);
88 static void sock_queue_process_event(struct tevent_context *ev,
89 struct tevent_immediate *im,
92 struct sock_queue *sock_queue_setup(TALLOC_CTX *mem_ctx,
93 struct tevent_context *ev,
95 sock_queue_callback_fn_t callback,
98 struct sock_queue *queue;
100 queue = talloc_zero(mem_ctx, struct sock_queue);
106 queue->callback = callback;
107 queue->private_data = private_data;
109 queue->im = tevent_create_immediate(queue);
110 if (queue->im == NULL) {
115 queue->queue = tevent_queue_create(queue, "out-queue");
116 if (queue->queue == NULL) {
121 if (! sock_queue_set_fd(queue, fd)) {
126 talloc_set_destructor(queue, sock_queue_destructor);
131 static bool sock_queue_set_fd(struct sock_queue *queue, int fd)
133 TALLOC_FREE(queue->fde);
139 ret = set_blocking(fd, false);
144 queue->fde = tevent_add_fd(queue->ev, queue, fd,
146 sock_queue_handler, queue);
147 if (queue->fde == NULL) {
150 tevent_fd_set_auto_close(queue->fde);
156 static int sock_queue_destructor(struct sock_queue *queue)
158 TALLOC_FREE(queue->fde);
164 static void sock_queue_handler(struct tevent_context *ev,
165 struct tevent_fd *fde, uint16_t flags,
168 struct sock_queue *queue = talloc_get_type_abort(
169 private_data, struct sock_queue);
173 ret = ioctl(queue->fd, FIONREAD, &num_ready);
179 if (num_ready == 0) {
180 /* descriptor has been closed */
184 if (num_ready > queue->buflen - queue->end) {
185 queue->buf = talloc_realloc_size(queue, queue->buf,
186 queue->end + num_ready);
187 if (queue->buf == NULL) {
190 queue->buflen = queue->end + num_ready;
193 nread = sys_read(queue->fd, queue->buf + queue->end, num_ready);
199 sock_queue_process(queue);
203 queue->callback(NULL, 0, queue->private_data);
206 static void sock_queue_process(struct sock_queue *queue)
210 if ((queue->end - queue->begin) < sizeof(uint32_t)) {
211 /* not enough data */
215 pkt_size = *(uint32_t *)(queue->buf + queue->begin);
217 D_ERR("Invalid packet of length 0\n");
218 queue->callback(NULL, 0, queue->private_data);
221 if ((queue->end - queue->begin) < pkt_size) {
222 /* not enough data */
226 queue->callback(queue->buf + queue->begin, pkt_size,
227 queue->private_data);
228 queue->begin += pkt_size;
230 if (queue->begin < queue->end) {
231 /* more data to be processed */
232 tevent_schedule_immediate(queue->im, queue->ev,
233 sock_queue_process_event, queue);
235 TALLOC_FREE(queue->buf);
242 static void sock_queue_process_event(struct tevent_context *ev,
243 struct tevent_immediate *im,
246 struct sock_queue *queue = talloc_get_type_abort(
247 private_data, struct sock_queue);
249 sock_queue_process(queue);
252 struct sock_queue_write_state {
257 static void sock_queue_trigger(struct tevent_req *req, void *private_data);
259 int sock_queue_write(struct sock_queue *queue, uint8_t *buf, size_t buflen)
261 struct tevent_req *req;
262 struct sock_queue_write_state *state;
265 if (buflen >= INT32_MAX) {
269 req = tevent_req_create(queue, &state, struct sock_queue_write_state);
275 state->pkt_size = (uint32_t)buflen;
277 status = tevent_queue_add_entry(queue->queue, queue->ev, req,
278 sock_queue_trigger, queue);
287 static void sock_queue_trigger(struct tevent_req *req, void *private_data)
289 struct sock_queue *queue = talloc_get_type_abort(
290 private_data, struct sock_queue);
291 struct sock_queue_write_state *state = tevent_req_data(
292 req, struct sock_queue_write_state);
298 nwritten = sys_write(queue->fd, state->pkt + offset,
299 state->pkt_size - offset);
301 queue->callback(NULL, 0, queue->private_data);
306 } while (offset < state->pkt_size);
308 tevent_req_done(req);