From: Ronnie Sahlberg Date: Fri, 10 Dec 2010 02:39:18 +0000 (+1100) Subject: LibCTDB X-Git-Url: http://git.samba.org/?p=sahlberg%2Fctdb.git;a=commitdiff_plain;h=5a46d95b8d4dbc1e5aea3fb73912ac6ee49cd182 LibCTDB Add an input queue where we keep received pdus we have not yet processed This allows us to perform SYNC calls from an ASYNC callback --- diff --git a/libctdb/ctdb.c b/libctdb/ctdb.c index 23acef4a..26495248 100644 --- a/libctdb/ctdb.c +++ b/libctdb/ctdb.c @@ -24,6 +24,7 @@ #include #include #include +#include #include "libctdb_private.h" #include "io_elem.h" #include "local_tdb.h" @@ -417,6 +418,19 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents) while (revents & POLLIN) { int ret; + int num_ready = 0; + + if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) { + DEBUG(ctdb, LOG_ERR, + "ctdb_service: ioctl(FIONREAD) %d", errno); + ctdb->broken = true; + return false; + } + if (num_ready == 0) { + /* the descriptor has been closed or we have all our data */ + break; + } + if (!ctdb->in) { ctdb->in = new_io_elem(sizeof(struct ctdb_req_header)); @@ -439,13 +453,21 @@ bool ctdb_service(struct ctdb_connection *ctdb, int revents) return false; } else if (ret < 0) { /* No progress, stop loop. */ - revents = 0; + break; } else if (io_elem_finished(ctdb->in)) { - handle_incoming(ctdb, ctdb->in); + io_elem_queue(ctdb, ctdb->in); ctdb->in = NULL; } } + + while (ctdb->inqueue != NULL) { + struct io_elem *io = ctdb->inqueue; + + io_elem_dequeue(ctdb, io); + handle_incoming(ctdb, io); + } + return true; } diff --git a/libctdb/io_elem.c b/libctdb/io_elem.c index bff21cb3..81d44e43 100644 --- a/libctdb/io_elem.c +++ b/libctdb/io_elem.c @@ -23,12 +23,15 @@ #include #include #include +#include "libctdb_private.h" #include "io_elem.h" #include #include +#include #include // For CTDB_DS_ALIGNMENT and ctdb_req_header struct io_elem { + struct io_elem *next, *prev; size_t len, off; char *data; }; @@ -55,6 +58,8 @@ struct io_elem *new_io_elem(size_t len) } elem->len = len; elem->off = 0; + elem->next = NULL; + elem->prev = NULL; return elem; } @@ -145,3 +150,14 @@ void io_elem_reset(struct io_elem *io) { io->off = 0; } + +void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io) +{ + DLIST_ADD_END(ctdb->inqueue, io, struct io_elem); +} + +void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io) +{ + DLIST_REMOVE(ctdb->inqueue, io); +} + diff --git a/libctdb/io_elem.h b/libctdb/io_elem.h index e774cdbd..dd65fcd0 100644 --- a/libctdb/io_elem.h +++ b/libctdb/io_elem.h @@ -32,4 +32,10 @@ int read_io_elem(int fd, struct io_elem *io); /* Returns -1 if we hit an error. Otherwise bytes written. */ int write_io_elem(int fd, struct io_elem *io); +/* Queues a received io element for later processing */ +void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io); + +/* Removes an element from the queue */ +void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io); + #endif /* _LIBCTDB_IO_ELEM_H */ diff --git a/libctdb/libctdb_private.h b/libctdb/libctdb_private.h index 8ecfb0ac..24bd9828 100644 --- a/libctdb/libctdb_private.h +++ b/libctdb/libctdb_private.h @@ -59,8 +59,12 @@ struct ctdb_connection { struct ctdb_request *outq; /* Finished outgoings (awaiting response) */ struct ctdb_request *doneq; + /* Current incoming. */ struct io_elem *in; + /* Queue of received pdus */ + struct io_elem *inqueue; + /* Guess at a good reqid to try next. */ uint32_t next_id; /* List of messages */ diff --git a/libctdb/messages.c b/libctdb/messages.c index d61d29e1..f28ac4a6 100644 --- a/libctdb/messages.c +++ b/libctdb/messages.c @@ -202,7 +202,7 @@ bool ctdb_send_message(struct ctdb_connection *ctdb, return false; } - io_elem_init_req_header(req->io, + io_elem_init_req_header(req->pdu, CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb)); pkt = req->hdr.message;