common/io: Limit the queue buffer size for fair scheduling via tevent
authorAmitay Isaacs <amitay@gmail.com>
Wed, 21 Aug 2013 04:42:06 +0000 (14:42 +1000)
committerAmitay Isaacs <amitay@gmail.com>
Tue, 10 Sep 2013 05:03:40 +0000 (15:03 +1000)
If we process all the data available in a socket buffer, CTDB can stay busy
processing lots of packets via immediate event mechanism in tevent.  After
processing an immediate event, tevent returns without epoll_wait.  So as long
as there are immediate events, tevent will never poll other FDs.  CTDB will
report this as "Event handling took xx seconds" warning.  This is misleading
since CTDB is very busy processing packets, but never gets to the point of
polling FDs.

The improvement in socket handling made it worse when handling traverse
control.  There were lots of packets filled in the socket buffer quickly and
CTDB stayed busy processing those packets and not polling other FDs and timer
events.  This can lead to controls timing out and in worse case other nodes
marking busy node as disconnected.

Signed-off-by: Amitay Isaacs <amitay@gmail.com>
(cherry picked from commit 92939c1178d04116d842708bc2d6a9c2950e36cc)

common/ctdb_io.c

index 4e164d9d44aa73259e70c45d149c2d87679ec7b1..99c50c1f593edadfd01bcd664c96218551268e68 100644 (file)
 #include "../include/ctdb_client.h"
 #include <stdarg.h>
 
+#define QUEUE_BUFFER_SIZE      (16*1024)
+
 /* structures for packet queueing - see common/ctdb_io.c */
 struct ctdb_buffer {
        uint8_t *data;
        uint32_t length;
        uint32_t size;
+       uint32_t extend;
 };
 
 struct ctdb_queue_pkt {
@@ -114,7 +117,9 @@ static void queue_process(struct ctdb_queue *queue)
        }
 
        if (queue->buffer.length < pkt_size) {
-               DEBUG(DEBUG_DEBUG, ("Partial packet data read\n"));
+               if (pkt_size > QUEUE_BUFFER_SIZE) {
+                       queue->buffer.extend = pkt_size;
+               }
                return;
        }
 
@@ -138,6 +143,11 @@ static void queue_process(struct ctdb_queue *queue)
                /* There is more data to be processed, schedule an event */
                tevent_schedule_immediate(queue->im, queue->ctdb->ev,
                                          queue_process_event, queue);
+       } else {
+               if (queue->buffer.size > QUEUE_BUFFER_SIZE) {
+                       TALLOC_FREE(queue->buffer.data);
+                       queue->buffer.size = 0;
+               }
        }
 
        /* It is the responsibility of the callback to free 'data' */
@@ -159,6 +169,7 @@ static void queue_io_read(struct ctdb_queue *queue)
        int num_ready = 0;
        ssize_t nread;
        uint8_t *data;
+       int navail;
 
        if (ioctl(queue->fd, FIONREAD, &num_ready) != 0) {
                return;
@@ -170,29 +181,37 @@ static void queue_io_read(struct ctdb_queue *queue)
 
        if (queue->buffer.data == NULL) {
                /* starting fresh, allocate buf to read data */
-               queue->buffer.data = talloc_size(queue, num_ready);
+               queue->buffer.data = talloc_size(queue, QUEUE_BUFFER_SIZE);
                if (queue->buffer.data == NULL) {
                        DEBUG(DEBUG_ERR, ("read error alloc failed for %u\n", num_ready));
                        goto failed;
                }
-               queue->buffer.size = num_ready;
-       } else if (queue->buffer.length + num_ready > queue->buffer.size) {
+               queue->buffer.size = QUEUE_BUFFER_SIZE;
+       } else if (queue->buffer.extend > 0) {
                /* extending buffer */
-               data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.length + num_ready);
+               data = talloc_realloc_size(queue, queue->buffer.data, queue->buffer.extend);
                if (data == NULL) {
-                       DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.length + num_ready));
+                       DEBUG(DEBUG_ERR, ("read error realloc failed for %u\n", queue->buffer.extend));
                        goto failed;
                }
                queue->buffer.data = data;
-               queue->buffer.size = queue->buffer.length + num_ready;
+               queue->buffer.size = queue->buffer.extend;
+               queue->buffer.extend = 0;
        }
 
-       nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
-       if (nread <= 0) {
-               DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
-               goto failed;
+       navail = queue->buffer.size - queue->buffer.length;
+       if (num_ready > navail) {
+               num_ready = navail;
+       }
+
+       if (num_ready > 0) {
+               nread = read(queue->fd, queue->buffer.data + queue->buffer.length, num_ready);
+               if (nread <= 0) {
+                       DEBUG(DEBUG_ERR, ("read error nread=%d\n", (int)nread));
+                       goto failed;
+               }
+               queue->buffer.length += nread;
        }
-       queue->buffer.length += nread;
 
        queue_process(queue);
        return;