b7feed9c118162257171354e4bdbdef9c0c65f3b
[sahlberg/ctdb.git] / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "lib/tdb/include/tdb.h"
25 #include "lib/events/events.h"
26 #include "lib/util/dlinklist.h"
27 #include "system/network.h"
28 #include "system/filesys.h"
29 #include "../include/ctdb_private.h"
30 #include "../include/ctdb.h"
31
32 /* structures for packet queueing - see common/ctdb_io.c */
33 struct ctdb_partial {
34         uint8_t *data;
35         uint32_t length;
36 };
37
38 struct ctdb_queue_pkt {
39         struct ctdb_queue_pkt *next, *prev;
40         uint8_t *data;
41         uint32_t length;
42         uint32_t full_length;
43 };
44
45 struct ctdb_queue {
46         struct ctdb_context *ctdb;
47         struct ctdb_partial partial; /* partial input packet */
48         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
49         uint32_t out_queue_length;
50         struct fd_event *fde;
51         int fd;
52         size_t alignment;
53         void *private_data;
54         ctdb_queue_cb_fn_t callback;
55         bool *destroyed;
56 };
57
58
59
60 int ctdb_queue_length(struct ctdb_queue *queue)
61 {
62         return queue->out_queue_length;
63 }
64
65 /*
66   called when an incoming connection is readable
67 */
68 static void queue_io_read(struct ctdb_queue *queue)
69 {
70         int num_ready = 0;
71         ssize_t nread;
72         uint8_t *data, *data_base;
73
74         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
75                 return;
76         }
77         if (num_ready == 0) {
78                 /* the descriptor has been closed */
79                 goto failed;
80         }
81
82
83         queue->partial.data = talloc_realloc_size(queue, queue->partial.data, 
84                                                   num_ready + queue->partial.length);
85
86         if (queue->partial.data == NULL) {
87                 DEBUG(DEBUG_ERR,("read error alloc failed for %u\n", 
88                          num_ready + queue->partial.length));
89                 goto failed;
90         }
91
92         nread = read(queue->fd, queue->partial.data + queue->partial.length, num_ready);
93         if (nread <= 0) {
94                 DEBUG(DEBUG_ERR,("read error nread=%d\n", (int)nread));
95                 goto failed;
96         }
97
98
99         data = queue->partial.data;
100         nread += queue->partial.length;
101
102         queue->partial.data = NULL;
103         queue->partial.length = 0;
104
105         if (nread >= 4 && *(uint32_t *)data == nread) {
106                 /* it is the responsibility of the incoming packet
107                  function to free 'data' */
108                 queue->callback(data, nread, queue->private_data);
109                 return;
110         }
111
112         data_base = data;
113
114         while (nread >= 4 && *(uint32_t *)data <= nread) {
115                 /* we have at least one packet */
116                 uint8_t *d2;
117                 uint32_t len;
118                 bool destroyed = false;
119
120                 len = *(uint32_t *)data;
121                 if (len == 0) {
122                         /* bad packet! treat as EOF */
123                         DEBUG(DEBUG_CRIT,("Invalid packet of length 0\n"));
124                         goto failed;
125                 }
126                 d2 = talloc_memdup(queue, data, len);
127                 if (d2 == NULL) {
128                         DEBUG(DEBUG_ERR,("read error memdup failed for %u\n", len));
129                         /* sigh */
130                         goto failed;
131                 }
132
133                 queue->destroyed = &destroyed;
134                 queue->callback(d2, len, queue->private_data);
135                 /* If callback freed us, don't do anything else. */
136                 if (destroyed) {
137                         return;
138                 }
139                 queue->destroyed = NULL;
140
141                 data += len;
142                 nread -= len;           
143         }
144
145         if (nread > 0) {
146                 /* we have only part of a packet */
147                 if (data_base == data) {
148                         queue->partial.data = data;
149                         queue->partial.length = nread;
150                 } else {
151                         queue->partial.data = talloc_memdup(queue, data, nread);
152                         if (queue->partial.data == NULL) {
153                                 DEBUG(DEBUG_ERR,("read error memdup partial failed for %u\n", 
154                                          (unsigned)nread));
155                                 goto failed;
156                         }
157                         queue->partial.length = nread;
158                         talloc_free(data_base);
159                 }
160                 return;
161         }
162
163         talloc_free(data_base);
164         return;
165
166 failed:
167         queue->callback(NULL, 0, queue->private_data);
168 }
169
170
171 /* used when an event triggers a dead queue */
172 static void queue_dead(struct event_context *ev, struct timed_event *te, 
173                        struct timeval t, void *private_data)
174 {
175         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
176         queue->callback(NULL, 0, queue->private_data);
177 }
178
179
180 /*
181   called when an incoming connection is writeable
182 */
183 static void queue_io_write(struct ctdb_queue *queue)
184 {
185         while (queue->out_queue) {
186                 struct ctdb_queue_pkt *pkt = queue->out_queue;
187                 ssize_t n;
188                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
189                         n = write(queue->fd, pkt->data, 1);
190                 } else {
191                         n = write(queue->fd, pkt->data, pkt->length);
192                 }
193
194                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
195                         if (pkt->length != pkt->full_length) {
196                                 /* partial packet sent - we have to drop it */
197                                 DLIST_REMOVE(queue->out_queue, pkt);
198                                 queue->out_queue_length--;
199                                 talloc_free(pkt);
200                         }
201                         talloc_free(queue->fde);
202                         queue->fde = NULL;
203                         queue->fd = -1;
204                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
205                                         queue_dead, queue);
206                         return;
207                 }
208                 if (n <= 0) return;
209                 
210                 if (n != pkt->length) {
211                         pkt->length -= n;
212                         pkt->data += n;
213                         return;
214                 }
215
216                 DLIST_REMOVE(queue->out_queue, pkt);
217                 queue->out_queue_length--;
218                 talloc_free(pkt);
219         }
220
221         EVENT_FD_NOT_WRITEABLE(queue->fde);
222 }
223
224 /*
225   called when an incoming connection is readable or writeable
226 */
227 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
228                              uint16_t flags, void *private_data)
229 {
230         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
231
232         if (flags & EVENT_FD_READ) {
233                 queue_io_read(queue);
234         } else {
235                 queue_io_write(queue);
236         }
237 }
238
239
240 /*
241   queue a packet for sending
242 */
243 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
244 {
245         struct ctdb_queue_pkt *pkt;
246         uint32_t length2, full_length;
247
248         if (queue->alignment) {
249                 /* enforce the length and alignment rules from the tcp packet allocator */
250                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
251                 *(uint32_t *)data = length2;
252         } else {
253                 length2 = length;
254         }
255
256         if (length2 != length) {
257                 memset(data+length, 0, length2-length);
258         }
259
260         full_length = length2;
261         
262         /* if the queue is empty then try an immediate write, avoiding
263            queue overhead. This relies on non-blocking sockets */
264         if (queue->out_queue == NULL && queue->fd != -1 &&
265             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
266                 ssize_t n = write(queue->fd, data, length2);
267                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
268                         talloc_free(queue->fde);
269                         queue->fde = NULL;
270                         queue->fd = -1;
271                         event_add_timed(queue->ctdb->ev, queue, timeval_zero(), 
272                                         queue_dead, queue);
273                         /* yes, we report success, as the dead node is 
274                            handled via a separate event */
275                         return 0;
276                 }
277                 if (n > 0) {
278                         data += n;
279                         length2 -= n;
280                 }
281                 if (length2 == 0) return 0;
282         }
283
284         pkt = talloc(queue, struct ctdb_queue_pkt);
285         CTDB_NO_MEMORY(queue->ctdb, pkt);
286
287         pkt->data = talloc_memdup(pkt, data, length2);
288         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
289
290         pkt->length = length2;
291         pkt->full_length = full_length;
292
293         if (queue->out_queue == NULL && queue->fd != -1) {
294                 EVENT_FD_WRITEABLE(queue->fde);
295         }
296
297         DLIST_ADD_END(queue->out_queue, pkt, NULL);
298
299         queue->out_queue_length++;
300
301         if (queue->ctdb->tunable.verbose_memory_names != 0) {
302                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
303                 switch (hdr->operation) {
304                 case CTDB_REQ_CONTROL: {
305                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
306                         talloc_set_name(pkt, "ctdb_queue_pkt: control opcode=%u srvid=%llu datalen=%u",
307                                         (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
308                         break;
309                 }
310                 case CTDB_REQ_MESSAGE: {
311                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
312                         talloc_set_name(pkt, "ctdb_queue_pkt: message srvid=%llu datalen=%u",
313                                         (unsigned long long)m->srvid, (unsigned)m->datalen);
314                         break;
315                 }
316                 default:
317                         talloc_set_name(pkt, "ctdb_queue_pkt: operation=%u length=%u src=%u dest=%u",
318                                         (unsigned)hdr->operation, (unsigned)hdr->length, 
319                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
320                         break;
321                 }
322         }
323
324         return 0;
325 }
326
327
328 /*
329   setup the fd used by the queue
330  */
331 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
332 {
333         queue->fd = fd;
334         talloc_free(queue->fde);
335         queue->fde = NULL;
336
337         if (fd != -1) {
338                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ|EVENT_FD_AUTOCLOSE, 
339                                           queue_io_handler, queue);
340                 if (queue->fde == NULL) {
341                         return -1;
342                 }
343
344                 if (queue->out_queue) {
345                         EVENT_FD_WRITEABLE(queue->fde);         
346                 }
347         }
348
349         return 0;
350 }
351
352 /* If someone sets up this pointer, they want to know if the queue is freed */
353 static int queue_destructor(struct ctdb_queue *queue)
354 {
355         if (queue->destroyed != NULL)
356                 *queue->destroyed = true;
357         return 0;
358 }
359
360 /*
361   setup a packet queue on a socket
362  */
363 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
364                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
365                                     
366                                     ctdb_queue_cb_fn_t callback,
367                                     void *private_data)
368 {
369         struct ctdb_queue *queue;
370
371         queue = talloc_zero(mem_ctx, struct ctdb_queue);
372         CTDB_NO_MEMORY_NULL(ctdb, queue);
373
374         queue->ctdb = ctdb;
375         queue->fd = fd;
376         queue->alignment = alignment;
377         queue->private_data = private_data;
378         queue->callback = callback;
379         if (fd != -1) {
380                 if (ctdb_queue_set_fd(queue, fd) != 0) {
381                         talloc_free(queue);
382                         return NULL;
383                 }
384         }
385         talloc_set_destructor(queue, queue_destructor);
386
387         return queue;
388 }