#include "../include/ctdb_client.h"
#include <stdarg.h>
+#define QUEUE_BUFFER_SIZE (16*1024)
+
/* structures for packet queueing - see common/ctdb_io.c */
struct ctdb_buffer {
uint8_t *data;
uint32_t length;
uint32_t size;
+ uint32_t extend;
};
struct ctdb_queue_pkt {
}
if (queue->buffer.length < pkt_size) {
- DEBUG(DEBUG_DEBUG, ("Partial packet data read\n"));
+ if (pkt_size > QUEUE_BUFFER_SIZE) {
+ queue->buffer.extend = pkt_size;
+ }
return;
}
/* There is more data to be processed, schedule an event */
tevent_schedule_immediate(queue->im, queue->ctdb->ev,
queue_process_event, queue);
+ } else {
+ if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
+ TALLOC_FREE(queue->buffer.data);
+ queue->buffer.size = 0;
+ }
}
/* It is the responsibility of the callback to free 'data' */
int num_ready = 0;
ssize_t nread;
uint8_t *data;
+ int navail;
if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
return;
if (queue->buffer.data == NULL) {
/* starting fresh, allocate buf to read data */
- queue->buffer.data = talloc_size(queue, num_ready);
+ queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
if (queue->buffer.data == NULL) {
DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
goto failed;
}
- queue->buffer.size = num_ready;
- } else if (queue->buffer.length + num_ready > queue->buffer.size) {
+ queue->buffer.size = QUEUE_BUFFER_SIZE;
+ } else if (queue->buffer.extend > 0) {
/* extending buffer */
- data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.length + num_ready);
+ data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
if (data == NULL) {
- DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.length + num_ready));
+ DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
goto failed;
}
queue->buffer.data = data;
- queue->buffer.size = queue->buffer.length + num_ready;
+ queue->buffer.size = queue->buffer.extend;
+ queue->buffer.extend = 0;
}
- nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
- if (nread <= 0) {
- DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
- goto failed;
+ navail = queue->buffer.size - queue->buffer.length;
+ if (num_ready > navail) {
+ num_ready = navail;
+ }
+
+ if (num_ready > 0) {
+ nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
+ if (nread <= 0) {
+ DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
+ goto failed;
+ }
+ queue->buffer.length += nread;
}
- queue->buffer.length += nread;
queue_process(queue);
return;