recoverd: Refactor code to ban misbehaving nodes
[ctdb.git] / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb.h"
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
49         uint32_t out_queue_length;
50         struct fd_event *fde;
51         int fd;
52         size_t alignment;
53         void *private_data;
54         ctdb_queue_cb_fn_t callback;
55         bool *destroyed;
56 };
57
58
59
60 int ctdb_queue_length(struct ctdb_queue *queue)
61 {
62         return queue->out_queue_length;
63 }
64
65 /*
66   called when an incoming connection is readable
67   This function MUST be safe for reentry via the queue callback!
68 */
69 static void queue_io_read(struct ctdb_queue *queue)
70 {
71         int num_ready = 0;
72         uint32_t sz_bytes_req;
73         uint32_t pkt_size;
74         uint32_t pkt_bytes_remaining;
75         uint32_t to_read;
76         ssize_t nread;
77         uint8_t *data;
78
79         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
80                 return;
81         }
82         if (num_ready == 0) {
83                 /* the descriptor has been closed */
84                 goto failed;
85         }
86
87         if (queue->partial.data == NULL) {
88                 /* starting fresh, allocate buf for size bytes */
89                 sz_bytes_req = sizeof(pkt_size);
90                 queue->partial.data = talloc_size(queue, sz_bytes_req);
91                 if (queue->partial.data == NULL) {
92                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
93                                          sz_bytes_req));
94                         goto failed;
95                 }
96         } else if (queue->partial.length < sizeof(pkt_size)) {
97                 /* yet to find out the packet length */
98                 sz_bytes_req = sizeof(pkt_size) - queue->partial.length;
99         } else {
100                 /* partial packet, length known, full buf allocated */
101                 sz_bytes_req = 0;
102         }
103         data = queue->partial.data;
104
105         if (sz_bytes_req > 0) {
106                 to_read = MIN(sz_bytes_req, num_ready);
107                 nread = read(queue->fd, data + queue->partial.length,
108                              to_read);
109                 if (nread <= 0) {
110                         DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
111                         goto failed;
112                 }
113                 queue->partial.length += nread;
114
115                 if (nread < sz_bytes_req) {
116                         /* not enough to know the length */
117                         DEBUG(DEBUG_DEBUG,("Partial packet length read\n"));
118                         return;
119                 }
120                 /* size now known, allocate buffer for the full packet */
121                 queue->partial.data = talloc_realloc_size(queue, data,
122                                                           *(uint32_t *)data);
123                 if (queue->partial.data == NULL) {
124                         DEBUG(DEBUG_ERR,("read error alloc failed for %u\n",
125                                          *(uint32_t *)data));
126                         goto failed;
127                 }
128                 data = queue->partial.data;
129                 num_ready -= nread;
130         }
131
132         pkt_size = *(uint32_t *)data;
133         if (pkt_size == 0) {
134                 DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
135                 goto failed;
136         }
137
138         pkt_bytes_remaining = pkt_size - queue->partial.length;
139         to_read = MIN(pkt_bytes_remaining, num_ready);
140         nread = read(queue->fd, data + queue->partial.length,
141                      to_read);
142         if (nread <= 0) {
143                 DEBUG(DEBUG_ERR,("read error nread=%d\n",
144                                  (int)nread));
145                 goto failed;
146         }
147         queue->partial.length += nread;
148
149         if (queue->partial.length < pkt_size) {
150                 DEBUG(DEBUG_DEBUG,("Partial packet data read\n"));
151                 return;
152         }
153
154         queue->partial.data = NULL;
155         queue->partial.length = 0;
156         /* it is the responsibility of the callback to free 'data' */
157         queue->callback(data, pkt_size, queue->private_data);
158         return;
159
160 failed:
161         queue->callback(NULL, 0, queue->private_data);
162 }
163
164
165 /* used when an event triggers a dead queue */
166 static void queue_dead(struct event_context *ev, struct timed_event *te, 
167                        struct timeval t, void *private_data)
168 {
169         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
170         queue->callback(NULL, 0, queue->private_data);
171 }
172
173
174 /*
175   called when an incoming connection is writeable
176 */
177 static void queue_io_write(struct ctdb_queue *queue)
178 {
179         while (queue->out_queue) {
180                 struct ctdb_queue_pkt *pkt = queue->out_queue;
181                 ssize_t n;
182                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
183                         n = write(queue->fd, pkt->data, 1);
184                 } else {
185                         n = write(queue->fd, pkt->data, pkt->length);
186                 }
187
188                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
189                         if (pkt->length != pkt->full_length) {
190                                 /* partial packet sent - we have to drop it */
191                                 DLIST_REMOVE(queue->out_queue, pkt);
192                                 queue->out_queue_length--;
193                                 talloc_free(pkt);
194                         }
195                         talloc_free(queue->fde);
196                         queue->fde = NULL;
197                         queue->fd = -1;
198                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
199                                         queue_dead, queue);
200                         return;
201                 }
202                 if (n <= 0) return;
203                 
204                 if (n != pkt->length) {
205                         pkt->length -= n;
206                         pkt->data += n;
207                         return;
208                 }
209
210                 DLIST_REMOVE(queue->out_queue, pkt);
211                 queue->out_queue_length--;
212                 talloc_free(pkt);
213         }
214
215         EVENT_FD_NOT_WRITEABLE(queue->fde);
216 }
217
218 /*
219   called when an incoming connection is readable or writeable
220 */
221 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
222                              uint16_t flags, void *private_data)
223 {
224         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
225
226         if (flags & EVENT_FD_READ) {
227                 queue_io_read(queue);
228         } else {
229                 queue_io_write(queue);
230         }
231 }
232
233
234 /*
235   queue a packet for sending
236 */
237 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
238 {
239         struct ctdb_queue_pkt *pkt;
240         uint32_t length2, full_length;
241
242         if (queue->alignment) {
243                 /* enforce the length and alignment rules from the tcp packet allocator */
244                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
245                 *(uint32_t *)data = length2;
246         } else {
247                 length2 = length;
248         }
249
250         if (length2 != length) {
251                 memset(data+length, 0, length2-length);
252         }
253
254         full_length = length2;
255         
256         /* if the queue is empty then try an immediate write, avoiding
257            queue overhead. This relies on non-blocking sockets */
258         if (queue->out_queue == NULL && queue->fd != -1 &&
259             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
260                 ssize_t n = write(queue->fd, data, length2);
261                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
262                         talloc_free(queue->fde);
263                         queue->fde = NULL;
264                         queue->fd = -1;
265                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
266                                         queue_dead, queue);
267                         /* yes, we report success, as the dead node is 
268                            handled via a separate event */
269                         return 0;
270                 }
271                 if (n > 0) {
272                         data += n;
273                         length2 -= n;
274                 }
275                 if (length2 == 0) return 0;
276         }
277
278         pkt = talloc(queue, struct ctdb_queue_pkt);
279         CTDB_NO_MEMORY(queue->ctdb, pkt);
280
281         pkt->data = talloc_memdup(pkt, data, length2);
282         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
283
284         pkt->length = length2;
285         pkt->full_length = full_length;
286
287         if (queue->out_queue == NULL && queue->fd != -1) {
288                 EVENT_FD_WRITEABLE(queue->fde);
289         }
290
291         DLIST_ADD_END(queue->out_queue, pkt, NULL);
292
293         queue->out_queue_length++;
294
295         if (queue->ctdb->tunable.verbose_memory_names != 0) {
296                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
297                 switch (hdr->operation) {
298                 case CTDB_REQ_CONTROL: {
299                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
300                         talloc_set_name(pkt, "ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u",
301                                         (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
302                         break;
303                 }
304                 case CTDB_REQ_MESSAGE: {
305                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
306                         talloc_set_name(pkt, "ctdb_queue_pkt: message srvid=%llu datalen=%u",
307                                         (unsigned long long)m->srvid, (unsigned)m->datalen);
308                         break;
309                 }
310                 default:
311                         talloc_set_name(pkt, "ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u",
312                                         (unsigned)hdr->operation, (unsigned)hdr->length, 
313                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
314                         break;
315                 }
316         }
317
318         return 0;
319 }
320
321
322 /*
323   setup the fd used by the queue
324  */
325 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
326 {
327         queue->fd = fd;
328         talloc_free(queue->fde);
329         queue->fde = NULL;
330
331         if (fd != -1) {
332                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
333                                           queue_io_handler, queue);
334                 if (queue->fde == NULL) {
335                         return -1;
336                 }
337
338                 if (queue->out_queue) {
339                         EVENT_FD_WRITEABLE(queue->fde);         
340                 }
341         }
342
343         return 0;
344 }
345
346 /* If someone sets up this pointer, they want to know if the queue is freed */
347 static int queue_destructor(struct ctdb_queue *queue)
348 {
349         if (queue->destroyed != NULL)
350                 *queue->destroyed = true;
351         return 0;
352 }
353
354 /*
355   setup a packet queue on a socket
356  */
357 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
358                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
359                                     
360                                     ctdb_queue_cb_fn_t callback,
361                                     void *private_data)
362 {
363         struct ctdb_queue *queue;
364
365         queue = talloc_zero(mem_ctx, struct ctdb_queue);
366         CTDB_NO_MEMORY_NULL(ctdb, queue);
367
368         queue->ctdb = ctdb;
369         queue->fd = fd;
370         queue->alignment = alignment;
371         queue->private_data = private_data;
372         queue->callback = callback;
373         if (fd != -1) {
374                 if (ctdb_queue_set_fd(queue, fd) != 0) {
375                         talloc_free(queue);
376                         return NULL;
377                 }
378         }
379         talloc_set_destructor(queue, queue_destructor);
380
381         return queue;
382 }