#include <stdlib.h>
#include <sys/socket.h>
#include <sys/un.h>
+#include <sys/ioctl.h>
#include "libctdb_private.h"
#include "io_elem.h"
#include "local_tdb.h"
ctdb_rrl_callback_t callback;
};
+struct ctdb_db {
+ struct ctdb_connection *ctdb;
+ bool persistent;
+ uint32_t tdb_flags;
+ uint32_t id;
+ struct tdb_context *tdb;
+
+ ctdb_callback_t callback;
+ void *private_data;
+};
+
static void remove_lock(struct ctdb_connection *ctdb, struct ctdb_lock *lock)
{
DLIST_REMOVE(ctdb->locks, lock);
DLIST_ADD(ctdb->locks, lock);
}
+static void cleanup_locks(struct ctdb_connection *ctdb, struct ctdb_db *db)
+{
+ struct ctdb_lock *i, *next;
+
+ for (i = ctdb->locks; i; i = next) {
+ /* Grab next pointer, as release_lock will free i */
+ next = i->next;
+ if (i->ctdb_db == db) {
+ ctdb_release_lock(db, i);
+ }
+ }
+}
+
/* FIXME: Could be in shared util code with rest of ctdb */
static void close_noerr(int fd)
{
ctdb->outq = NULL;
ctdb->doneq = NULL;
ctdb->in = NULL;
+ ctdb->inqueue = NULL;
ctdb->message_handlers = NULL;
ctdb->next_id = 0;
ctdb->broken = false;
return NULL;
}
+void ctdb_disconnect(struct ctdb_connection *ctdb)
+{
+ struct ctdb_request *i;
+
+ DEBUG(ctdb, LOG_DEBUG, "ctdb_disconnect");
+
+ while ((i = ctdb->outq) != NULL) {
+ DLIST_REMOVE(ctdb->outq, i);
+ ctdb_request_free(ctdb, i);
+ }
+
+ while ((i = ctdb->doneq) != NULL) {
+ DLIST_REMOVE(ctdb->doneq, i);
+ ctdb_request_free(ctdb, i);
+ }
+
+ if (ctdb->in)
+ free_io_elem(ctdb->in);
+
+ remove_message_handlers(ctdb);
+
+ close(ctdb->fd);
+ /* Just in case they try to reuse */
+ ctdb->fd = -1;
+ free(ctdb);
+}
+
int ctdb_get_fd(struct ctdb_connection *ctdb)
{
return ctdb->fd;
void ctdb_request_free(struct ctdb_connection *ctdb, struct ctdb_request *req)
{
+ if (req->next || req->prev) {
+ DEBUG(ctdb, LOG_ALERT,
+ "ctdb_request_free: request not complete! ctdb_cancel? %p (id %u)",
+ req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
+ ctdb_cancel(ctdb, req);
+ return;
+ }
if (req->extra_destructor) {
req->extra_destructor(ctdb, req);
}
while (revents & POLLIN) {
int ret;
+ int num_ready = 0;
+
+ if (ioctl(ctdb->fd, FIONREAD, &num_ready) != 0) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_service: ioctl(FIONREAD) %d", errno);
+ ctdb->broken = true;
+ return false;
+ }
+ if (num_ready == 0) {
+ /* the descriptor has been closed or we have all our data */
+ break;
+ }
+
if (!ctdb->in) {
ctdb->in = new_io_elem(sizeof(struct ctdb_req_header));
return false;
} else if (ret < 0) {
/* No progress, stop loop. */
- revents = 0;
+ break;
} else if (io_elem_finished(ctdb->in)) {
- handle_incoming(ctdb, ctdb->in);
+ io_elem_queue(ctdb, ctdb->in);
ctdb->in = NULL;
}
}
+
+ while (ctdb->inqueue != NULL) {
+ struct io_elem *io = ctdb->inqueue;
+
+ io_elem_dequeue(ctdb, io);
+ handle_incoming(ctdb, io);
+ }
+
return true;
}
void ctdb_cancel(struct ctdb_connection *ctdb, struct ctdb_request *req)
{
+ if (!req->next && !req->prev) {
+ DEBUG(ctdb, LOG_ALERT,
+ "ctdb_cancel: request completed! ctdb_request_free? %p (id %u)",
+ req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
+ ctdb_request_free(ctdb, req);
+ return;
+ }
+
DEBUG(ctdb, LOG_DEBUG, "ctdb_cancel: %p (id %u)",
req, req->hdr.hdr ? req->hdr.hdr->reqid : 0);
req->callback = ctdb_cancel_callback;
}
-struct ctdb_db {
- struct ctdb_connection *ctdb;
- bool persistent;
- uint32_t tdb_flags;
- uint32_t id;
- struct tdb_context *tdb;
-
- ctdb_callback_t callback;
- void *private_data;
-};
+void ctdb_detachdb(struct ctdb_connection *ctdb, struct ctdb_db *db)
+{
+ cleanup_locks(ctdb, db);
+ tdb_close(db->tdb);
+ free(db);
+}
static void attachdb_getdbpath_done(struct ctdb_connection *ctdb,
struct ctdb_request *req,
/* Now it's their responsibility to free lock & request! */
req->extra_destructor = NULL;
lock->callback(lock->ctdb_db, lock, data, private);
+ ctdb_request_free(ctdb, req);
return;
}
return false;
}
}
+
+
+struct ctdb_traverse_state {
+ struct ctdb_request *handle;
+ struct ctdb_db *ctdb_db;
+ uint64_t srvid;
+
+ ctdb_traverse_callback_t callback;
+ void *cbdata;
+};
+
+static void traverse_remhnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ if (!ctdb_remove_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to remove message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+}
+
+static void msg_h(struct ctdb_connection *ctdb, uint64_t srvid,
+ TDB_DATA data, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_rec_data *d = (struct ctdb_rec_data *)data.dptr;
+ TDB_DATA key;
+
+ if (data.dsize < sizeof(uint32_t) ||
+ d->length != data.dsize) {
+ DEBUG(ctdb, LOG_ERR,
+ "Bad data size %u in traverse_handler",
+ (unsigned)data.dsize);
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ key.dsize = d->keylen;
+ key.dptr = &d->data[0];
+ data.dsize = d->datalen;
+ data.dptr = &d->data[d->keylen];
+
+ if (key.dsize == 0 && data.dsize == 0) {
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_FINISHED,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+
+ if (data.dsize <= sizeof(struct ctdb_ltdb_header)) {
+ /* empty records are deleted records in ctdb */
+ return;
+ }
+
+ data.dsize -= sizeof(struct ctdb_ltdb_header);
+ data.dptr += sizeof(struct ctdb_ltdb_header);
+
+ if (state->callback(ctdb, ctdb_db,
+ TRAVERSE_STATUS_RECORD,
+ key, data, state->cbdata) != 0) {
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+static void traverse_start_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+}
+
+static void traverse_msghnd_cb(struct ctdb_connection *ctdb,
+ struct ctdb_request *req, void *private_data)
+{
+ struct ctdb_traverse_state *state = private_data;
+ struct ctdb_db *ctdb_db = state->ctdb_db;
+ struct ctdb_traverse_start t;
+
+ if (!ctdb_set_message_handler_recv(ctdb, state->handle)) {
+ DEBUG(ctdb, LOG_ERR,
+ "Failed to register message handler for"
+ " traverse.");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+ free(state);
+ return;
+ }
+ ctdb_request_free(ctdb, state->handle);
+ state->handle = NULL;
+
+ t.db_id = ctdb_db->id;
+ t.srvid = state->srvid;
+ t.reqid = 0;
+
+ state->handle = new_ctdb_control_request(ctdb,
+ CTDB_CONTROL_TRAVERSE_START,
+ CTDB_CURRENT_NODE,
+ &t, sizeof(t),
+ traverse_start_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed to send traverse_start control");
+ state->callback(state->ctdb_db->ctdb, state->ctdb_db,
+ TRAVERSE_STATUS_ERROR,
+ tdb_null, tdb_null,
+ state->cbdata);
+ state->handle = ctdb_remove_message_handler_send(
+ state->ctdb_db->ctdb, state->srvid,
+ msg_h, state,
+ traverse_remhnd_cb, state);
+ return;
+ }
+}
+
+bool ctdb_traverse_async(struct ctdb_db *ctdb_db,
+ ctdb_traverse_callback_t callback, void *cbdata)
+{
+ struct ctdb_connection *ctdb = ctdb_db->ctdb;
+ struct ctdb_traverse_state *state;
+ static uint32_t tid = 0;
+
+ state = malloc(sizeof(struct ctdb_traverse_state));
+ if (state == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async: no memory."
+ " allocate state failed");
+ return false;
+ }
+
+ tid++;
+ state->srvid = CTDB_SRVID_TRAVERSE_RANGE|tid;
+
+ state->callback = callback;
+ state->cbdata = cbdata;
+ state->ctdb_db = ctdb_db;
+
+ state->handle = ctdb_set_message_handler_send(ctdb_db->ctdb,
+ state->srvid,
+ msg_h, state,
+ traverse_msghnd_cb, state);
+ if (state->handle == NULL) {
+ DEBUG(ctdb, LOG_ERR,
+ "ctdb_traverse_async:"
+ " failed ctdb_set_message_handler_send");
+ free(state);
+ return false;
+ }
+
+ return true;
+}