LibCTDB
[sahlberg/ctdb.git] / libctdb / io_elem.c
1 /*
2    Simple queuing of input and output records for libctdb
3
4    Copyright (C) Rusty Russell 2010
5
6    This program is free software; you can redistribute it and/or modify
7    it under the terms of the GNU General Public License as published by
8    the Free Software Foundation; either version 3 of the License, or
9    (at your option) any later version.
10
11    This program is distributed in the hope that it will be useful,
12    but WITHOUT ANY WARRANTY; without even the implied warranty of
13    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14    GNU General Public License for more details.
15
16    You should have received a copy of the GNU General Public License
17    along with this program; if not, see <http://www.gnu.org/licenses/>.
18 */
19 #include <sys/types.h>
20 #include <string.h>
21 #include <stdint.h>
22 #include <stdbool.h>
23 #include <unistd.h>
24 #include <errno.h>
25 #include <stdlib.h>
26 #include "libctdb_private.h"
27 #include "io_elem.h"
28 #include <tdb.h>
29 #include <netinet/in.h>
30 #include <dlinklist.h>
31 #include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
32
33 struct io_elem {
34         struct io_elem *next, *prev;
35         size_t len, off;
36         char *data;
37 };
38
39 struct io_elem *new_io_elem(size_t len)
40 {
41         struct io_elem *elem;
42         size_t ask = len;
43
44         len = (len + (CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1);
45
46         elem = malloc(sizeof(*elem));
47         if (!elem)
48                 return NULL;
49         elem->data = malloc(len);
50         if (!elem->data) {
51                 free(elem);
52                 return NULL;
53         }
54
55         /* stamp out any padding to keep valgrind happy */
56         if (ask != len) {
57                 memset(elem->data + ask, 0, len-ask);
58         }
59         elem->len = len;
60         elem->off = 0;
61         elem->next = NULL;
62         elem->prev = NULL;
63         return elem;
64 }
65
66 void free_io_elem(struct io_elem *io)
67 {
68         free(io->data);
69         free(io);
70 }
71
72 bool io_elem_finished(const struct io_elem *io)
73 {
74         return io->off == io->len;
75 }
76
77 void io_elem_init_req_header(struct io_elem *io,
78                              uint32_t operation,
79                              uint32_t destnode,
80                              uint32_t reqid)
81 {
82         struct ctdb_req_header *hdr = io_elem_data(io, NULL);
83
84         hdr->length = io->len;
85         hdr->ctdb_magic = CTDB_MAGIC;
86         hdr->ctdb_version = CTDB_VERSION;
87         /* Generation and srcnode only used for inter-ctdbd communication. */
88         hdr->generation = 0;
89         hdr->destnode = destnode;
90         hdr->srcnode = 0;
91         hdr->operation = operation;
92         hdr->reqid = reqid;
93 }
94
95 /* Access to raw data: if len is non-NULL it is filled in. */
96 void *io_elem_data(const struct io_elem *io, size_t *len)
97 {
98         if (len)
99                 *len = io->len;
100         return io->data;
101 }
102
103 /* Returns -1 if we hit an error.  Errno will be set. */
104 int read_io_elem(int fd, struct io_elem *io)
105 {
106         ssize_t ret;
107
108         ret = read(fd, io->data + io->off, io->len - io->off);
109         if (ret < 0)
110                 return ret;
111
112         io->off += ret;
113         if (io_elem_finished(io)) {
114                 struct ctdb_req_header *hdr = (void *)io->data;
115
116                 /* Finished.  But maybe this was just header? */
117                 if (io->len == sizeof(*hdr) && hdr->length > io->len) {
118                         int reret;
119                         void *newdata;
120                         /* Enlarge and re-read. */
121                         io->len = hdr->length;
122                         newdata = realloc(io->data, io->len);
123                         if (!newdata)
124                                 return -1;
125                         io->data = newdata;
126                         /* Try reading again immediately. */
127                         reret = read_io_elem(fd, io);
128                         if (reret >= 0)
129                                 reret += ret;
130                         return reret;
131                 }
132         }
133         return ret;
134 }
135
136 /* Returns -1 if we hit an error.  Errno will be set. */
137 int write_io_elem(int fd, struct io_elem *io)
138 {
139         ssize_t ret;
140
141         ret = write(fd, io->data + io->off, io->len - io->off);
142         if (ret < 0)
143                 return ret;
144
145         io->off += ret;
146         return ret;
147 }
148
149 void io_elem_reset(struct io_elem *io)
150 {
151         io->off = 0;
152 }
153
154 void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io)
155 {
156         DLIST_ADD_END(ctdb->inqueue, io, struct io_elem);
157 }
158
159 void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io)
160 {
161         DLIST_REMOVE(ctdb->inqueue, io);
162 }
163