LibCTDB
[sahlberg/ctdb.git] / libctdb / ctdb.c
index 23acef4ada9ae6b26c0668b5da8471df7d3afc03..264952485797a065fb3d689ef20dc6ed2face204 100644 (file)
@@ -24,6 +24,7 @@
 #include <stdlib.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <sys/ioctl.h>
 #include "libctdb_private.h"
 #include "io_elem.h"
 #include "local_tdb.h"
@@ -417,6 +418,19 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents)
 
        while (revents & POLLIN) {
                int ret;
+               int num_ready = 0;
+
+               if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) {
+                       DEBUG(ctdb, LOG_ERR,
+                             "ctdb_service: ioctl(FIONREAD) %d", errno);
+                       ctdb->broken = true;
+                       return false;
+               }
+               if (num_ready == 0) {
+                       /* the descriptor has been closed or we have all our data */
+                       break;
+               }
+
 
                if (!ctdb->in) {
                        ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
@@ -439,13 +453,21 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents)
                        return false;
                } else if (ret < 0) {
                        /* No progress, stop loop. */
-                       revents = 0;
+                       break;
                } else if (io_elem_finished(ctdb->in)) {
-                       handle_incoming(ctdb, ctdb->in);
+                       io_elem_queue(ctdb, ctdb->in);
                        ctdb->in = NULL;
                }
        }
 
+
+       while (ctdb->inqueue != NULL) {
+               struct io_elem *io = ctdb->inqueue;
+
+               io_elem_dequeue(ctdb, io);
+               handle_incoming(ctdb, io);
+       }
+
        return true;
 }