#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/ioctl.h>
#include "libctdb_private.h"
#include "io_elem.h"
#include "local_tdb.h"
while (revents & POLLIN) {
int ret;
+ int num_ready = 0;
+
+ if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_service: ioctl(FIONREAD) %d", errno);
+ ctdb->broken = true;
+ return false;
+ }
+ if (num_ready == 0) {
+ /* the descriptor has been closed or we have all our data */
+ break;
+ }
+
if (!ctdb->in) {
ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
return false;
} else if (ret < 0) {
/* No progress, stop loop. */
- revents = 0;
+ break;
} else if (io_elem_finished(ctdb->in)) {
- handle_incoming(ctdb, ctdb->in);
+ io_elem_queue(ctdb, ctdb->in);
ctdb->in = NULL;
}
}
+
+ while (ctdb->inqueue != NULL) {
+ struct io_elem *io = ctdb->inqueue;
+
+ io_elem_dequeue(ctdb, io);
+ handle_incoming(ctdb, io);
+ }
+
return true;
}
#include <unistd.h>
#include <errno.h>
#include <stdlib.h>
+#include "libctdb_private.h"
#include "io_elem.h"
#include <tdb.h>
#include <netinet/in.h>
+#include <dlinklist.h>
#include <ctdb_protocol.h> // For CTDB_DS_ALIGNMENT and ctdb_req_header
struct io_elem {
+ struct io_elem *next, *prev;
size_t len, off;
char *data;
};
}
elem->len = len;
elem->off = 0;
+ elem->next = NULL;
+ elem->prev = NULL;
return elem;
}
{
io->off = 0;
}
+
+void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io)
+{
+ DLIST_ADD_END(ctdb->inqueue, io, struct io_elem);
+}
+
+void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io)
+{
+ DLIST_REMOVE(ctdb->inqueue, io);
+}
+
/* Returns -1 if we hit an error. Otherwise bytes written. */
int write_io_elem(int fd, struct io_elem *io);
+/* Queues a received io element for later processing */
+void io_elem_queue(struct ctdb_connection *ctdb, struct io_elem *io);
+
+/* Removes an element from the queue */
+void io_elem_dequeue(struct ctdb_connection *ctdb, struct io_elem *io);
+
#endif /* _LIBCTDB_IO_ELEM_H */
struct ctdb_request *outq;
/* Finished outgoings (awaiting response) */
struct ctdb_request *doneq;
+
/* Current incoming. */
struct io_elem *in;
+ /* Queue of received pdus */
+ struct io_elem *inqueue;
+
/* Guess at a good reqid to try next. */
uint32_t next_id;
/* List of messages */
return false;
}
- io_elem_init_req_header(req->io,
+ io_elem_init_req_header(req->pdu,
CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb));
pkt = req->hdr.message;