tests/tool: Fix some comment typos
[ctdb.git] / common / ctdb_io.c
1 /* 
2    ctdb database library
3    Utility functions to read/write blobs of data from a file descriptor
4    and handle the case where we might need multiple read/writes to get all the
5    data.
6
7    Copyright (C) Andrew Tridgell  2006
8
9    This program is free software; you can redistribute it and/or modify
10    it under the terms of the GNU General Public License as published by
11    the Free Software Foundation; either version 3 of the License, or
12    (at your option) any later version.
13    
14    This program is distributed in the hope that it will be useful,
15    but WITHOUT ANY WARRANTY; without even the implied warranty of
16    MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
17    GNU General Public License for more details.
18    
19    You should have received a copy of the GNU General Public License
20    along with this program; if not, see <http://www.gnu.org/licenses/>.
21 */
22
23 #include "includes.h"
24 #include "tdb.h"
25 #include "lib/util/dlinklist.h"
26 #include "system/network.h"
27 #include "system/filesys.h"
28 #include "../include/ctdb_private.h"
29 #include "../include/ctdb_client.h"
30 #include <stdarg.h>
31
32 #define QUEUE_BUFFER_SIZE       (16*1024)
33
34 /* structures for packet queueing - see common/ctdb_io.c */
35 struct ctdb_buffer {
36         uint8_t *data;
37         uint32_t length;
38         uint32_t size;
39         uint32_t extend;
40 };
41
42 struct ctdb_queue_pkt {
43         struct ctdb_queue_pkt *next, *prev;
44         uint8_t *data;
45         uint32_t length;
46         uint32_t full_length;
47 };
48
49 struct ctdb_queue {
50         struct ctdb_context *ctdb;
51         struct tevent_immediate *im;
52         struct ctdb_buffer buffer; /* input buffer */
53         struct ctdb_queue_pkt *out_queue, *out_queue_tail;
54         uint32_t out_queue_length;
55         struct fd_event *fde;
56         int fd;
57         size_t alignment;
58         void *private_data;
59         ctdb_queue_cb_fn_t callback;
60         bool *destroyed;
61         const char *name;
62 };
63
64
65
66 int ctdb_queue_length(struct ctdb_queue *queue)
67 {
68         return queue->out_queue_length;
69 }
70
71 static void queue_process(struct ctdb_queue *queue);
72
73 static void queue_process_event(struct tevent_context *ev, struct tevent_immediate *im,
74                                 void *private_data)
75 {
76         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
77
78         queue_process(queue);
79 }
80
81 /*
82  * This function is used to process data in queue buffer.
83  *
84  * Queue callback function can end up freeing the queue, there should not be a
85  * loop processing packets from queue buffer.  Instead set up a timed event for
86  * immediate run to process remaining packets from buffer.
87  */
88 static void queue_process(struct ctdb_queue *queue)
89 {
90         uint32_t pkt_size;
91         uint8_t *data;
92
93         if (queue->buffer.length < sizeof(pkt_size)) {
94                 return;
95         }
96
97         pkt_size = *(uint32_t *)queue->buffer.data;
98         if (pkt_size == 0) {
99                 DEBUG(DEBUG_CRIT, ("Invalid packet of length 0\n"));
100                 goto failed;
101         }
102
103         if (queue->buffer.length < pkt_size) {
104                 if (pkt_size > QUEUE_BUFFER_SIZE) {
105                         queue->buffer.extend = pkt_size;
106                 }
107                 return;
108         }
109
110         /* Extract complete packet */
111         data = talloc_size(queue, pkt_size);
112         if (data == NULL) {
113                 DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", pkt_size));
114                 return;
115         }
116         memcpy(data, queue->buffer.data, pkt_size);
117
118         /* Shift packet out from buffer */
119         if (queue->buffer.length > pkt_size) {
120                 memmove(queue->buffer.data,
121                         queue->buffer.data + pkt_size,
122                         queue->buffer.length - pkt_size);
123         }
124         queue->buffer.length -= pkt_size;
125
126         if (queue->buffer.length > 0) {
127                 /* There is more data to be processed, schedule an event */
128                 tevent_schedule_immediate(queue->im, queue->ctdb->ev,
129                                           queue_process_event, queue);
130         } else {
131                 if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
132                         TALLOC_FREE(queue->buffer.data);
133                         queue->buffer.size = 0;
134                 }
135         }
136
137         /* It is the responsibility of the callback to free 'data' */
138         queue->callback(data, pkt_size, queue->private_data);
139         return;
140
141 failed:
142         queue->callback(NULL, 0, queue->private_data);
143
144 }
145
146
147 /*
148   called when an incoming connection is readable
149   This function MUST be safe for reentry via the queue callback!
150 */
151 static void queue_io_read(struct ctdb_queue *queue)
152 {
153         int num_ready = 0;
154         ssize_t nread;
155         uint8_t *data;
156         int navail;
157
158         /* check how much data is available on the socket for immediately
159            guaranteed nonblocking access.
160            as long as we are careful never to try to read more than this
161            we know all reads will be successful and will neither block
162            nor fail with a "data not available right now" error
163         */
164         if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
165                 return;
166         }
167         if (num_ready == 0) {
168                 /* the descriptor has been closed */
169                 goto failed;
170         }
171
172         if (queue->buffer.data == NULL) {
173                 /* starting fresh, allocate buf to read data */
174                 queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
175                 if (queue->buffer.data == NULL) {
176                         DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
177                         goto failed;
178                 }
179                 queue->buffer.size = QUEUE_BUFFER_SIZE;
180         } else if (queue->buffer.extend > 0) {
181                 /* extending buffer */
182                 data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
183                 if (data == NULL) {
184                         DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
185                         goto failed;
186                 }
187                 queue->buffer.data = data;
188                 queue->buffer.size = queue->buffer.extend;
189                 queue->buffer.extend = 0;
190         }
191
192         navail = queue->buffer.size - queue->buffer.length;
193         if (num_ready > navail) {
194                 num_ready = navail;
195         }
196
197         if (num_ready > 0) {
198                 nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
199                 if (nread <= 0) {
200                         DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
201                         goto failed;
202                 }
203                 queue->buffer.length += nread;
204         }
205
206         queue_process(queue);
207         return;
208
209 failed:
210         queue->callback(NULL, 0, queue->private_data);
211 }
212
213
214 /* used when an event triggers a dead queue */
215 static void queue_dead(struct event_context *ev, struct tevent_immediate *im,
216                        void *private_data)
217 {
218         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
219         queue->callback(NULL, 0, queue->private_data);
220 }
221
222
223 /*
224   called when an incoming connection is writeable
225 */
226 static void queue_io_write(struct ctdb_queue *queue)
227 {
228         while (queue->out_queue) {
229                 struct ctdb_queue_pkt *pkt = queue->out_queue;
230                 ssize_t n;
231                 if (queue->ctdb->flags & CTDB_FLAG_TORTURE) {
232                         n = write(queue->fd, pkt->data, 1);
233                 } else {
234                         n = write(queue->fd, pkt->data, pkt->length);
235                 }
236
237                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
238                         if (pkt->length != pkt->full_length) {
239                                 /* partial packet sent - we have to drop it */
240                                 DLIST_REMOVE(queue->out_queue, pkt);
241                                 queue->out_queue_length--;
242                                 talloc_free(pkt);
243                         }
244                         talloc_free(queue->fde);
245                         queue->fde = NULL;
246                         queue->fd = -1;
247                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
248                                                   queue_dead, queue);
249                         return;
250                 }
251                 if (n <= 0) return;
252                 
253                 if (n != pkt->length) {
254                         pkt->length -= n;
255                         pkt->data += n;
256                         return;
257                 }
258
259                 DLIST_REMOVE(queue->out_queue, pkt);
260                 queue->out_queue_length--;
261                 talloc_free(pkt);
262         }
263
264         EVENT_FD_NOT_WRITEABLE(queue->fde);
265 }
266
267 /*
268   called when an incoming connection is readable or writeable
269 */
270 static void queue_io_handler(struct event_context *ev, struct fd_event *fde, 
271                              uint16_t flags, void *private_data)
272 {
273         struct ctdb_queue *queue = talloc_get_type(private_data, struct ctdb_queue);
274
275         if (flags & EVENT_FD_READ) {
276                 queue_io_read(queue);
277         } else {
278                 queue_io_write(queue);
279         }
280 }
281
282
283 /*
284   queue a packet for sending
285 */
286 int ctdb_queue_send(struct ctdb_queue *queue, uint8_t *data, uint32_t length)
287 {
288         struct ctdb_queue_pkt *pkt;
289         uint32_t length2, full_length;
290
291         if (queue->alignment) {
292                 /* enforce the length and alignment rules from the tcp packet allocator */
293                 length2 = (length+(queue->alignment-1)) & ~(queue->alignment-1);
294                 *(uint32_t *)data = length2;
295         } else {
296                 length2 = length;
297         }
298
299         if (length2 != length) {
300                 memset(data+length, 0, length2-length);
301         }
302
303         full_length = length2;
304         
305         /* if the queue is empty then try an immediate write, avoiding
306            queue overhead. This relies on non-blocking sockets */
307         if (queue->out_queue == NULL && queue->fd != -1 &&
308             !(queue->ctdb->flags & CTDB_FLAG_TORTURE)) {
309                 ssize_t n = write(queue->fd, data, length2);
310                 if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
311                         talloc_free(queue->fde);
312                         queue->fde = NULL;
313                         queue->fd = -1;
314                         tevent_schedule_immediate(queue->im, queue->ctdb->ev,
315                                                   queue_dead, queue);
316                         /* yes, we report success, as the dead node is 
317                            handled via a separate event */
318                         return 0;
319                 }
320                 if (n > 0) {
321                         data += n;
322                         length2 -= n;
323                 }
324                 if (length2 == 0) return 0;
325         }
326
327         pkt = talloc(queue, struct ctdb_queue_pkt);
328         CTDB_NO_MEMORY(queue->ctdb, pkt);
329
330         pkt->data = talloc_memdup(pkt, data, length2);
331         CTDB_NO_MEMORY(queue->ctdb, pkt->data);
332
333         pkt->length = length2;
334         pkt->full_length = full_length;
335
336         if (queue->out_queue == NULL && queue->fd != -1) {
337                 EVENT_FD_WRITEABLE(queue->fde);
338         }
339
340         DLIST_ADD_END(queue->out_queue, pkt, NULL);
341
342         queue->out_queue_length++;
343
344         if (queue->ctdb->tunable.verbose_memory_names != 0) {
345                 struct ctdb_req_header *hdr = (struct ctdb_req_header *)pkt->data;
346                 switch (hdr->operation) {
347                 case CTDB_REQ_CONTROL: {
348                         struct ctdb_req_control *c = (struct ctdb_req_control *)hdr;
349                         talloc_set_name(pkt, "ctdb_queue_pkt: %s control opcode=%u srvid=%llu datalen=%u",
350                                         queue->name, (unsigned)c->opcode, (unsigned long long)c->srvid, (unsigned)c->datalen);
351                         break;
352                 }
353                 case CTDB_REQ_MESSAGE: {
354                         struct ctdb_req_message *m = (struct ctdb_req_message *)hdr;
355                         talloc_set_name(pkt, "ctdb_queue_pkt: %s message srvid=%llu datalen=%u",
356                                         queue->name, (unsigned long long)m->srvid, (unsigned)m->datalen);
357                         break;
358                 }
359                 default:
360                         talloc_set_name(pkt, "ctdb_queue_pkt: %s operation=%u length=%u src=%u dest=%u",
361                                         queue->name, (unsigned)hdr->operation, (unsigned)hdr->length,
362                                         (unsigned)hdr->srcnode, (unsigned)hdr->destnode);
363                         break;
364                 }
365         }
366
367         return 0;
368 }
369
370
371 /*
372   setup the fd used by the queue
373  */
374 int ctdb_queue_set_fd(struct ctdb_queue *queue, int fd)
375 {
376         queue->fd = fd;
377         talloc_free(queue->fde);
378         queue->fde = NULL;
379
380         if (fd != -1) {
381                 queue->fde = event_add_fd(queue->ctdb->ev, queue, fd, EVENT_FD_READ,
382                                           queue_io_handler, queue);
383                 if (queue->fde == NULL) {
384                         return -1;
385                 }
386                 tevent_fd_set_auto_close(queue->fde);
387
388                 if (queue->out_queue) {
389                         EVENT_FD_WRITEABLE(queue->fde);         
390                 }
391         }
392
393         return 0;
394 }
395
396 /* If someone sets up this pointer, they want to know if the queue is freed */
397 static int queue_destructor(struct ctdb_queue *queue)
398 {
399         TALLOC_FREE(queue->buffer.data);
400         queue->buffer.length = 0;
401         queue->buffer.size = 0;
402         if (queue->destroyed != NULL)
403                 *queue->destroyed = true;
404         return 0;
405 }
406
407 /*
408   setup a packet queue on a socket
409  */
410 struct ctdb_queue *ctdb_queue_setup(struct ctdb_context *ctdb,
411                                     TALLOC_CTX *mem_ctx, int fd, int alignment,
412                                     ctdb_queue_cb_fn_t callback,
413                                     void *private_data, const char *fmt, ...)
414 {
415         struct ctdb_queue *queue;
416         va_list ap;
417
418         queue = talloc_zero(mem_ctx, struct ctdb_queue);
419         CTDB_NO_MEMORY_NULL(ctdb, queue);
420         va_start(ap, fmt);
421         queue->name = talloc_vasprintf(mem_ctx, fmt, ap);
422         va_end(ap);
423         CTDB_NO_MEMORY_NULL(ctdb, queue->name);
424
425         queue->im= tevent_create_immediate(queue);
426         CTDB_NO_MEMORY_NULL(ctdb, queue->im);
427
428         queue->ctdb = ctdb;
429         queue->fd = fd;
430         queue->alignment = alignment;
431         queue->private_data = private_data;
432         queue->callback = callback;
433         if (fd != -1) {
434                 if (ctdb_queue_set_fd(queue, fd) != 0) {
435                         talloc_free(queue);
436                         return NULL;
437                 }
438         }
439         talloc_set_destructor(queue, queue_destructor);
440
441         return queue;
442 }