LibCTDB
authorRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 10 Dec 2010 02:39:18 +0000 (13:39 +1100)
committerRonnie Sahlberg <ronniesahlberg@gmail.com>
Fri, 10 Dec 2010 02:39:18 +0000 (13:39 +1100)
Add an input queue where we keep received pdus we have not yet processed
This allows us to perform SYNC calls from an ASYNC callback

libctdb/ctdb.c
libctdb/io_elem.c
libctdb/io_elem.h
libctdb/libctdb_private.h
libctdb/messages.c

index 23acef4ada9ae6b26c0668b5da8471df7d3afc03..264952485797a065fb3d689ef20dc6ed2face204 100644 (file)
@@ -24,6 +24,7 @@
 #include <stdlib.h>
 #include <sys/socket.h>
 #include <sys/un.h>
+#include <sys/ioctl.h>
 #include "libctdb_private.h"
 #include "io_elem.h"
 #include "local_tdb.h"
@@ -417,6 +418,19 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents)
 
        while (revents & POLLIN) {
                int ret;
+               int num_ready = 0;
+
+               if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) {
+                       DEBUG(ctdb, LOG_ERR,
+                             "ctdb_service: ioctl(FIONREAD) %d", errno);
+                       ctdb->broken = true;
+                       return false;
+               }
+               if (num_ready == 0) {
+                       /* the descriptor has been closed or we have all our data */
+                       break;
+               }
+
 
                if (!ctdb->in) {
                        ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
@@ -439,13 +453,21 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents)
                        return false;
                } else if (ret < 0) {
                        /* No progress, stop loop. */
-                       revents = 0;
+                       break;
                } else if (io_elem_finished(ctdb->in)) {
-                       handle_incoming(ctdb, ctdb->in);
+                       io_elem_queue(ctdb, ctdb->in);
                        ctdb->in = NULL;
                }
        }
 
+
+       while (ctdb->inqueue != NULL) {
+               struct io_elem *io = ctdb->inqueue;
+
+               io_elem_dequeue(ctdb, io);
+               handle_incoming(ctdb, io);
+       }
+
        return true;
 }
 
index bff21cb313cba268fffbffcf78b4a4af68c41044..81d44e4369a528409bc9f630e022f35211cbf178 100644 (file)
 #include <unistd.h>
 #include <errno.h>
 #include <stdlib.h>
+#include "libctdb_private.h"
 #include "io_elem.h"
 #include <tdb.h>
 #include <netinet/in.h>
+#include <dlinklist.h>
 #include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
 
 struct io_elem {
+       struct io_elem *next, *prev;
        size_t len, off;
        char *data;
 };
@@ -55,6 +58,8 @@ struct io_elem *new_io_elem(size_t len)
        }
        elem->len = len;
        elem->off = 0;
+       elem->next = NULL;
+       elem->prev = NULL;
        return elem;
 }
 
@@ -145,3 +150,14 @@ void io_elem_reset(struct io_elem *io)
 {
        io->off = 0;
 }
+
+void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io)
+{
+       DLIST_ADD_END(ctdb->inqueue, io, struct io_elem);
+}
+
+void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io)
+{
+       DLIST_REMOVE(ctdb->inqueue, io);
+}
+
index e774cdbd369a8d7cfd45db3e86674c3061a53414..dd65fcd051b63c33dfa03e5f2d05d270e69f08a0 100644 (file)
@@ -32,4 +32,10 @@ int read_io_elem(int fd, struct io_elem *io);
 /* Returns -1 if we hit an error.  Otherwise bytes written. */
 int write_io_elem(int fd, struct io_elem *io);
 
+/* Queues a received io element for later processing */
+void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io);
+
+/* Removes an element from the queue */
+void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io);
+
 #endif /* _LIBCTDB_IO_ELEM_H */
index 8ecfb0ac7dc14b976187c218048db19132657df6..24bd9828a5c0a43cf31635d19434e486bd6d4020 100644 (file)
@@ -59,8 +59,12 @@ struct ctdb_connection {
        struct ctdb_request *outq;
        /* Finished outgoings (awaiting response) */
        struct ctdb_request *doneq;
+
        /* Current incoming. */
        struct io_elem *in;
+       /* Queue of received pdus */
+       struct io_elem *inqueue;
+
        /* Guess at a good reqid to try next. */
        uint32_t next_id;
        /* List of messages */
index d61d29e16f592cc0fa019279da59bd026dfb966d..f28ac4a61bee257eaffea05ca00b9651ffac0454 100644 (file)
@@ -202,7 +202,7 @@ bool ctdb_send_message(struct ctdb_connection *ctdb,
                return false;
        }
 
-       io_elem_init_req_header(req->io,
+       io_elem_init_req_header(req->pdu,
                                CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
 
        pkt = req->hdr.message;