From: Rusty Russell Date: Fri, 21 May 2010 02:37:41 +0000 (+0930) Subject: libctdb: first cut, supports getrecmaster only X-Git-Tag: ctdb-1.9.1~66^2~5 X-Git-Url: http://git.samba.org/?p=rusty%2Fctdb.git;a=commitdiff_plain;h=732cf6ead4aa2fbefc2b81328171f52c8d935341 libctdb: first cut, supports getrecmaster only This is a completely standalone library using only ctdb_protocol.h. Signed-off-by: Rusty Russell Header from folded patch 'libctdb-message-handling.patch': libctdb: add message handling to libctdb. Now clients can send and receive ctdb messages. --- diff --git a/libctdb/control.c b/libctdb/control.c new file mode 100644 index 00000000..84d703e8 --- /dev/null +++ b/libctdb/control.c @@ -0,0 +1,55 @@ +/* + Misc control routines of libctdb + + Copyright (C) Rusty Russell 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ +#include +#include +#include "libctdb_private.h" + +struct ctdb_request *ctdb_getrecmaster_send(struct ctdb_connection *ctdb, + uint32_t destnode, + ctdb_getrecmaster_cb callback, + void *private_data) +{ + struct ctdb_request *req; + + req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_RECMASTER, + destnode, NULL, 0); + if (!req) + return NULL; + req->callback.getrecmaster = callback; + req->priv_data = private_data; + return req; +} + +struct ctdb_request * +ctdb_getpnn_send(struct ctdb_connection *ctdb, + uint32_t destnode, + ctdb_getpnn_cb callback, + void *private_data) +{ + struct ctdb_request *req; + + req = new_ctdb_control_request(ctdb, CTDB_CONTROL_GET_PNN, destnode, + NULL, 0); + if (!req) { + return NULL; + } + req->callback.getpnn = callback; + req->priv_data = private_data; + return req; +} diff --git a/libctdb/ctdb.c b/libctdb/ctdb.c new file mode 100644 index 00000000..85875ac4 --- /dev/null +++ b/libctdb/ctdb.c @@ -0,0 +1,643 @@ +/* + core of libctdb + + Copyright (C) Rusty Russell 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ +#include +#include +#include +#include +#include +#include +#include +#include +#include "libctdb_private.h" +#include "io_elem.h" +#include "local_tdb.h" +#include "messages.h" +#include +#include + +/* FIXME: Could be in shared util code with rest of ctdb */ +static void close_noerr(int fd) +{ + int olderr = errno; + close(fd); + errno = olderr; +} + +/* FIXME: Could be in shared util code with rest of ctdb */ +static void free_noerr(void *p) +{ + int olderr = errno; + free(p); + errno = olderr; +} + +/* FIXME: Could be in shared util code with rest of ctdb */ +static void set_nonblocking(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFL, 0); + fcntl(fd, F_SETFL, v | O_NONBLOCK); +} + +/* FIXME: Could be in shared util code with rest of ctdb */ +static void set_close_on_exec(int fd) +{ + unsigned v; + v = fcntl(fd, F_GETFD, 0); + fcntl(fd, F_SETFD, v | FD_CLOEXEC); +} + +static void set_pnn(int32_t status, uint32_t pnn, void *private_data) +{ + if (status != 0) { + /* FIXME: Report error. */ + ((struct ctdb_connection *)private_data)->broken = true; + } else { + ((struct ctdb_connection *)private_data)->pnn = pnn; + } +} + +struct ctdb_connection *ctdb_connect(const char *addr) +{ + struct ctdb_connection *ctdb; + struct sockaddr_un sun; + + ctdb = malloc(sizeof(*ctdb)); + if (!ctdb) + goto fail; + ctdb->outq = NULL; + ctdb->doneq = NULL; + ctdb->immediateq = NULL; + ctdb->in = NULL; + ctdb->message_handlers = NULL; + + memset(&sun, 0, sizeof(sun)); + sun.sun_family = AF_UNIX; + if (!addr) + addr = CTDB_PATH; + strncpy(sun.sun_path, addr, sizeof(sun.sun_path)); + ctdb->fd = socket(AF_UNIX, SOCK_STREAM, 0); + if (ctdb->fd < 0) + goto free_fail; + + set_nonblocking(ctdb->fd); + set_close_on_exec(ctdb->fd); + + if (connect(ctdb->fd, (struct sockaddr *)&sun, sizeof(sun)) == -1) + goto close_fail; + + /* Immediately queue a request to get our pnn. */ + if (!ctdb_getpnn_send(ctdb, CTDB_CURRENT_NODE, set_pnn, ctdb)) + goto close_fail; + + return ctdb; + +close_fail: + close_noerr(ctdb->fd); +free_fail: + free_noerr(ctdb); +fail: + return NULL; +} + +int ctdb_get_fd(struct ctdb_connection *ctdb) +{ + return ctdb->fd; +} + +int ctdb_which_events(struct ctdb_connection *ctdb) +{ + int events = POLLIN; + + if (ctdb->outq) + events |= POLLOUT; + return events; +} + +struct ctdb_request *new_ctdb_request(size_t len) +{ + struct ctdb_request *req = malloc(sizeof(*req)); + if (!req) + return NULL; + req->io = new_io_elem(len); + if (!req->io) { + free(req); + return NULL; + } + req->hdr.hdr = io_elem_data(req->io, NULL); + req->cancelled = false; + return req; +} + +static struct ctdb_request *new_immediate_request(void) +{ + struct ctdb_request *req = malloc(sizeof(*req)); + if (!req) + return NULL; + req->cancelled = false; + req->io = NULL; + req->hdr.hdr = NULL; + return req; +} + +static void free_ctdb_request(struct ctdb_request *req) +{ + /* immediate requests don't have IO */ + if (req->io) { + free_io_elem(req->io); + } + free(req); +} + +static void handle_call_reply(struct ctdb_connection *ctdb, + struct ctdb_req_header *hdr, + struct ctdb_request *i) +{ + struct ctdb_req_call *call = i->hdr.call; + struct ctdb_reply_call *reply = (struct ctdb_reply_call *)hdr; + + switch (call->callid) { + case CTDB_NULL_FUNC: + /* FIXME: We should let it steal the request, rathe than copy */ + i->callback.nullfunc(reply->status, reply, i->priv_data); + break; + } +} + +static void handle_control_reply(struct ctdb_connection *ctdb, + struct ctdb_req_header *hdr, + struct ctdb_request *i) +{ + struct ctdb_req_control *control = i->hdr.control; + struct ctdb_reply_control *reply = (struct ctdb_reply_control *)hdr; + + switch (control->opcode) { + case CTDB_CONTROL_GET_RECMASTER: + i->callback.getrecmaster(0, reply->status, i->priv_data); + break; + case CTDB_CONTROL_GET_PNN: + i->callback.getpnn(0, reply->status, i->priv_data); + break; + case CTDB_CONTROL_REGISTER_SRVID: + i->callback.register_srvid(reply->status, i->priv_data); + break; + case CTDB_CONTROL_DB_ATTACH_PERSISTENT: + case CTDB_CONTROL_DB_ATTACH: + i->callback.attachdb(reply->status, *(uint32_t *)reply->data, + i->priv_data); + break; + case CTDB_CONTROL_GETDBPATH: + i->callback.getdbpath(reply->status, (char *)reply->data, + i->priv_data); + break; + } +} + +static void handle_incoming(struct ctdb_connection *ctdb, + struct ctdb_req_header *hdr, + size_t len /* FIXME: use len to check packet! */) +{ + struct ctdb_request *i; + + if (hdr->operation == CTDB_REQ_MESSAGE) { + deliver_message(ctdb, hdr); + return; + } + + if (hdr->operation != CTDB_REPLY_CALL + && hdr->operation != CTDB_REPLY_CONTROL) { + /* FIXME: report this error. */ + return; + } + + for (i = ctdb->doneq; i; i = i->next) { + if (i->hdr.hdr->reqid == hdr->reqid) { + if (!i->cancelled) { + if (hdr->operation == CTDB_REPLY_CALL) + handle_call_reply(ctdb, hdr, i); + else + handle_control_reply(ctdb, hdr, i); + } + DLIST_REMOVE(ctdb->doneq, i); + free_ctdb_request(i); + return; + } + } + /* FIXME: report this error. */ +} + +/* Remove "harmless" errors. */ +static ssize_t real_error(ssize_t ret) +{ + if (ret < 0 && (errno == EINTR || errno == EWOULDBLOCK)) + return 0; + return ret; +} + +int ctdb_service(struct ctdb_connection *ctdb, int revents) +{ + if (ctdb->broken) { + return -1; + } + + if (revents & POLLOUT) { + while (ctdb->outq) { + if (real_error(write_io_elem(ctdb->fd, + ctdb->outq->io)) < 0) { + ctdb->broken = true; + return -1; + } + if (io_elem_finished(ctdb->outq->io)) { + struct ctdb_request *done = ctdb->outq; + DLIST_REMOVE(ctdb->outq, done); + if (done->cancelled) { + free_ctdb_request(done); + } else { + DLIST_ADD_END(ctdb->doneq, done, + struct ctdb_request); + } + } + } + } + + while (revents & POLLIN) { + int ret; + + if (!ctdb->in) { + ctdb->in = new_io_elem(sizeof(struct ctdb_req_header)); + if (!ctdb->in) { + ctdb->broken = true; + return -1; + } + } + + ret = read_io_elem(ctdb->fd, ctdb->in); + if (real_error(ret) < 0 || ret == 0) { + /* They closed fd? */ + if (ret == 0) + errno = EBADF; + ctdb->broken = true; + return -1; + } else if (ret < 0) { + /* No progress, stop loop. */ + revents = 0; + } else if (io_elem_finished(ctdb->in)) { + struct ctdb_req_header *hdr; + size_t len; + + hdr = io_elem_data(ctdb->in, &len); + handle_incoming(ctdb, hdr, len); + free_io_elem(ctdb->in); + ctdb->in = NULL; + } + } + + while (ctdb->immediateq) { + struct ctdb_request *imm = ctdb->immediateq; + /* This has to handle fake->cancelled internally. */ + imm->callback.immediate(imm, imm->priv_data); + DLIST_REMOVE(ctdb->immediateq, imm); + free_ctdb_request(imm); + } + + return 0; +} + +/* This is inefficient. We could pull in idtree.c. */ +static bool reqid_used(const struct ctdb_connection *ctdb, uint32_t reqid) +{ + struct ctdb_request *i; + + for (i = ctdb->outq; i; i = i->next) { + if (i->hdr.hdr->reqid == reqid) { + return true; + } + } + for (i = ctdb->doneq; i; i = i->next) { + if (i->hdr.hdr->reqid == reqid) { + return true; + } + } + return false; +} + +uint32_t new_reqid(struct ctdb_connection *ctdb) +{ + while (reqid_used(ctdb, ctdb->next_id)) { + ctdb->next_id++; + } + return ctdb->next_id++; +} + +struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb, + uint32_t opcode, + uint32_t destnode, + const void *extra_data, + size_t extra) +{ + struct ctdb_request *req; + struct ctdb_req_control *pkt; + + req = new_ctdb_request(sizeof(*pkt) + extra); + if (!req) + return NULL; + + io_elem_init_req_header(req->io, + CTDB_REQ_CONTROL, destnode, new_reqid(ctdb)); + + pkt = req->hdr.control; + pkt->opcode = opcode; + pkt->srvid = 0; + pkt->client_id = 0; + pkt->flags = 0; + pkt->datalen = extra; + memcpy(pkt->data, extra_data, extra); + DLIST_ADD_END(ctdb->outq, req, struct ctdb_request); + return req; +} + +int ctdb_cancel(struct ctdb_request *req) +{ + /* FIXME: If it's not sent, we could just free it right now. */ + req->cancelled = true; + return 0; +} + +struct ctdb_db { + struct ctdb_connection *ctdb; + bool persistent; + uint32_t tdb_flags; + uint32_t id; + struct tdb_context *tdb; + + ctdb_attachdb_cb callback; + void *private_data; +}; + +static void attachdb_getdbpath_done(int status, const char *path, + void *_db) +{ + struct ctdb_db *db = _db; + uint32_t tdb_flags = db->tdb_flags; + + if (status != 0) { + db->callback(status, NULL, db->private_data); + free(db); + return; + } + + tdb_flags = db->persistent ? TDB_DEFAULT : TDB_NOSYNC; + tdb_flags |= TDB_DISALLOW_NESTING; + + db->tdb = tdb_open(path, 0, tdb_flags, O_RDWR, 0); + if (db->tdb == NULL) { + db->callback(-1, NULL, db->private_data); + free(db); + return; + } + + /* Finally, we tell the client that we opened the db. */ + db->callback(status, db, db->private_data); +} + +static void attachdb_done(int status, uint32_t id, struct ctdb_db *db) +{ + struct ctdb_request *req; + + if (status != 0) { + db->callback(status, NULL, db->private_data); + free(db); + return; + } + db->id = id; + + /* Now we do another call, to get the dbpath. */ + req = new_ctdb_control_request(db->ctdb, CTDB_CONTROL_GETDBPATH, + CTDB_CURRENT_NODE, &id, sizeof(id)); + if (!req) { + db->callback(-1, NULL, db->private_data); + free(db); + return; + } + req->callback.getdbpath = attachdb_getdbpath_done; + req->priv_data = db; +} + +struct ctdb_request * +ctdb_attachdb_send(struct ctdb_connection *ctdb, + const char *name, int persistent, uint32_t tdb_flags, + ctdb_attachdb_cb callback, + void *private_data) +{ + struct ctdb_request *req; + struct ctdb_db *db; + uint32_t opcode; + + /* FIXME: Search if db already open. */ + + db = malloc(sizeof(*db)); + if (!db) { + return NULL; + } + + if (persistent) { + opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT; + } else { + opcode = CTDB_CONTROL_DB_ATTACH; + } + + req = new_ctdb_control_request(ctdb, opcode, CTDB_CURRENT_NODE, name, + strlen(name) + 1); + if (!req) { + free(db); + return NULL; + } + + db->ctdb = ctdb; + db->tdb_flags = tdb_flags; + db->persistent = persistent; + db->callback = callback; + db->private_data = private_data; + + req->callback.attachdb = attachdb_done; + req->priv_data = db; + + /* Flags get overloaded into srvid. */ + req->hdr.control->srvid = tdb_flags; + return req; +} + +struct ctdb_lock { + struct ctdb_db *ctdb_db; + TDB_DATA key; + struct ctdb_ltdb_header *hdr; + TDB_DATA data; + bool held; + /* For convenience, we stash this here. */ + ctdb_readrecordlock_cb callback; + void *private_data; +}; + +void ctdb_release_lock(struct ctdb_lock *lock) +{ + if (lock->held) { + tdb_chainunlock(lock->ctdb_db->tdb, lock->key); + } + free(lock->key.dptr); + free(lock->hdr); /* Also frees lock->data */ + free(lock); +} + +/* We keep the lock if local node is the dmaster. */ +static bool try_readrecordlock(struct ctdb_lock *lock) +{ + if (tdb_chainlock(lock->ctdb_db->tdb, lock->key) != 0) { + return false; + } + + lock->hdr = ctdb_local_fetch(lock->ctdb_db->tdb, + lock->key, &lock->data); + if (lock->hdr && lock->hdr->dmaster == lock->ctdb_db->ctdb->pnn) { + lock->held = true; + return true; + } + + tdb_chainunlock(lock->ctdb_db->tdb, lock->key); + free(lock->hdr); + return false; +} + +static void readrecordlock_done(int, struct ctdb_reply_call *, void *); + +static struct ctdb_request *new_readrecordlock_request(struct ctdb_lock *lock) +{ + struct ctdb_request *req; + struct ctdb_req_call *pkt; + + req = new_ctdb_request(sizeof(*pkt) + lock->key.dsize); + if (!req) + return NULL; + req->callback.nullfunc = readrecordlock_done; + req->priv_data = lock; + + io_elem_init_req_header(req->io, CTDB_REQ_CALL, CTDB_CURRENT_NODE, + new_reqid(lock->ctdb_db->ctdb)); + + pkt = req->hdr.call; + pkt->flags = CTDB_IMMEDIATE_MIGRATION; + pkt->db_id = lock->ctdb_db->id; + pkt->callid = CTDB_NULL_FUNC; + pkt->hopcount = 0; + pkt->keylen = lock->key.dsize; + pkt->calldatalen = 0; + memcpy(pkt->data, lock->key.dptr, lock->key.dsize); + DLIST_ADD_END(lock->ctdb_db->ctdb->outq, req, struct ctdb_request); + return req; +} + +/* OK, let's try again... */ +static void readrecordlock_done(int status, struct ctdb_reply_call *reply, + void *_lock) +{ + struct ctdb_lock *lock = _lock; + + if (status != 0) { + lock->callback(status, NULL, tdb_null, lock->private_data); + ctdb_release_lock(lock); + return; + } + + if (try_readrecordlock(lock)) { + lock->callback(0, lock, lock->data, lock->private_data); + return; + } + + if (!new_readrecordlock_request(lock)) { + lock->callback(-1, NULL, tdb_null, lock->private_data); + ctdb_release_lock(lock); + } +} + +static void lock_complete(struct ctdb_request *req, void *_lock) +{ + struct ctdb_lock *lock = _lock; + + if (!req->cancelled) { + lock->callback(0, lock, lock->data, lock->private_data); + } else { + ctdb_release_lock(lock); + } +} + +struct ctdb_request * +ctdb_readrecordlock_send(struct ctdb_db *ctdb_db, + TDB_DATA key, + ctdb_readrecordlock_cb callback, + void *private_data) +{ + struct ctdb_request *req; + struct ctdb_lock *lock; + + lock = malloc(sizeof(*lock)); + if (!lock) + return NULL; + lock->key.dptr = malloc(key.dsize); + if (!lock->key.dptr) { + free_noerr(lock); + return NULL; + } + memcpy(lock->key.dptr, key.dptr, key.dsize); + lock->key.dsize = key.dsize; + lock->ctdb_db = ctdb_db; + lock->callback = callback; + lock->private_data = private_data; + lock->hdr = NULL; + lock->held = false; + + if (try_readrecordlock(lock)) { + /* We pretend to be async, so we just queue this. */ + req = new_immediate_request(); + if (!req) { + ctdb_release_lock(lock); + return NULL; + } + req->callback.immediate = lock_complete; + req->priv_data = lock; + DLIST_ADD_END(lock->ctdb_db->ctdb->immediateq, + req, struct ctdb_request); + return req; + } + + req = new_readrecordlock_request(lock); + if (!req) { + ctdb_release_lock(lock); + return NULL; + } + return req; +} + +int ctdb_writerecord(struct ctdb_lock *lock, TDB_DATA data) +{ + if (lock->ctdb_db->persistent) { + /* FIXME: Report error. */ + return -1; + } + + return ctdb_local_store(lock->ctdb_db->tdb, lock->key, lock->hdr, data); +} diff --git a/libctdb/io_elem.c b/libctdb/io_elem.c new file mode 100644 index 00000000..91e84cce --- /dev/null +++ b/libctdb/io_elem.c @@ -0,0 +1,132 @@ +/* + Simple queuing of input and output records for libctdb + + Copyright (C) Rusty Russell 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ +#include +#include +#include +#include +#include +#include +#include "io_elem.h" +#include +#include // For CTDB_DS_ALIGNMENT and ctdb_req_header + +struct io_elem { + size_t len, off; + char *data; +}; + +struct io_elem *new_io_elem(size_t len) +{ + struct io_elem *elem; + len = (len + (CTDB_DS_ALIGNMENT-1)) & ~(CTDB_DS_ALIGNMENT-1); + + elem = malloc(sizeof(*elem)); + if (!elem) + return NULL; + elem->data = malloc(len); + if (!elem->data) { + free(elem); + return NULL; + } + + elem->len = len; + elem->off = 0; + return elem; +} + +void free_io_elem(struct io_elem *io) +{ + free(io->data); + free(io); +} + +bool io_elem_finished(const struct io_elem *io) +{ + return io->off == io->len; +} + +void io_elem_init_req_header(struct io_elem *io, + uint32_t operation, + uint32_t destnode, + uint32_t reqid) +{ + struct ctdb_req_header *hdr = io_elem_data(io, NULL); + + hdr->length = io->len; + hdr->ctdb_magic = CTDB_MAGIC; + hdr->ctdb_version = CTDB_VERSION; + /* Generation and srcnode only used for inter-ctdbd communication. */ + hdr->generation = 0; + hdr->destnode = destnode; + hdr->srcnode = 0; + hdr->operation = operation; + hdr->reqid = reqid; +} + +/* Access to raw data: if len is non-NULL it is filled in. */ +void *io_elem_data(const struct io_elem *io, size_t *len) +{ + if (len) + *len = io->len; + return io->data; +} + +/* Returns -1 if we hit an error. Errno will be set. */ +int read_io_elem(int fd, struct io_elem *io) +{ + ssize_t ret; + + ret = read(fd, io->data + io->off, io->len - io->off); + if (ret < 0) + return ret; + + io->off += ret; + if (io_elem_finished(io)) { + struct ctdb_req_header *hdr = (void *)io->data; + + /* Finished. But maybe this was just header? */ + if (io->len == sizeof(*hdr) && hdr->length > io->len) { + int reret; + /* Enlarge and re-read. */ + io->len = hdr->length; + io->data = realloc(io->data, io->len); + if (!io->data) + return -1; + /* Try reading again immediately. */ + reret = read_io_elem(fd, io); + if (reret >= 0) + reret += ret; + return reret; + } + } + return ret; +} + +/* Returns -1 if we hit an error. Errno will be set. */ +int write_io_elem(int fd, struct io_elem *io) +{ + ssize_t ret; + + ret = write(fd, io->data + io->off, io->len - io->off); + if (ret < 0) + return ret; + + io->off += ret; + return ret; +} diff --git a/libctdb/io_elem.h b/libctdb/io_elem.h new file mode 100644 index 00000000..5c234fec --- /dev/null +++ b/libctdb/io_elem.h @@ -0,0 +1,32 @@ +#ifndef _LIBCTDB_IO_ELEM_H +#define _LIBCTDB_IO_ELEM_H +#include + +/* Packets are of form: . */ + +/* Create a new queue element of at least len bytes (for reading or writing). + * Len may be rounded up for alignment. */ +struct io_elem *new_io_elem(size_t len); + +/* Free a queue element. */ +void free_io_elem(struct io_elem *io); + +/* If finished, this returns the request header, otherwise NULL. */ +bool io_elem_finished(const struct io_elem *io); + +/* Access to raw data: if len is non-NULL it is filled in. */ +void *io_elem_data(const struct io_elem *io, size_t *len); + +/* Initialise the struct ctdb_req_header at the front of the I/O. */ +void io_elem_init_req_header(struct io_elem *io, + uint32_t operation, + uint32_t destnode, + uint32_t reqid); + +/* Returns -1 if we hit an error. Otherwise bytes read. */ +int read_io_elem(int fd, struct io_elem *io); + +/* Returns -1 if we hit an error. Otherwise bytes written. */ +int write_io_elem(int fd, struct io_elem *io); + +#endif /* _LIBCTDB_IO_ELEM_H */ diff --git a/libctdb/libctdb_private.h b/libctdb/libctdb_private.h new file mode 100644 index 00000000..6e789cf9 --- /dev/null +++ b/libctdb/libctdb_private.h @@ -0,0 +1,63 @@ +#ifndef _LIBCTDB_PRIVATE_H +#define _LIBCTDB_PRIVATE_H +#include +#include +#include +#include +#include + +struct message_handler_info; +struct ctdb_reply_call; + +struct ctdb_request { + struct ctdb_request *next, *prev; + struct io_elem *io; + union { + struct ctdb_req_header *hdr; + struct ctdb_req_call *call; + struct ctdb_req_control *control; + struct ctdb_req_message *message; + } hdr; + bool cancelled; + union { + ctdb_getrecmaster_cb getrecmaster; + ctdb_getpnn_cb getpnn; + void (*register_srvid)(int, struct message_handler_info *); + void (*attachdb)(int, uint32_t id, struct ctdb_db *); + void (*getdbpath)(int, const char *, void *); + void (*nullfunc)(int, struct ctdb_reply_call *, void *); + void (*immediate)(struct ctdb_request *, void *); + } callback; + void *priv_data; +}; + +struct ctdb_connection { + /* Socket to ctdbd. */ + int fd; + /* Currently our failure mode is simple; return -1 from ctdb_service */ + bool broken; + /* Linked list of pending outgoings. */ + struct ctdb_request *outq; + /* Finished outgoings (awaiting response) */ + struct ctdb_request *doneq; + /* Successful sync requests, waiting for next service. */ + struct ctdb_request *immediateq; + /* Current incoming. */ + struct io_elem *in; + /* Guess at a good reqid to try next. */ + uint32_t next_id; + /* List of messages */ + struct message_handler_info *message_handlers; + /* PNN of this ctdb: valid by the time we do our first db connection. */ + uint32_t pnn; +}; + +/* ctdb.c */ +struct ctdb_request *new_ctdb_request(size_t len); +struct ctdb_request *new_ctdb_control_request(struct ctdb_connection *ctdb, + uint32_t opcode, + uint32_t destnode, + const void *extra_data, + size_t extra); +uint32_t new_reqid(struct ctdb_connection *ctdb); +#endif /* _LIBCTDB_PRIVATE_H */ diff --git a/libctdb/local_tdb.c b/libctdb/local_tdb.c new file mode 100644 index 00000000..dbf06ede --- /dev/null +++ b/libctdb/local_tdb.c @@ -0,0 +1,94 @@ +/* + libctdb local tdb access code + + Copyright (C) Andrew Tridgell 2006 + Copyright (C) Rusty Russell 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ + +#include +#include +#include +#include +#include +#include // For struct ctdb_ltdb_header +#include "local_tdb.h" + +/* + fetch a record from the ltdb, separating out the header information + and returning the body of the record. The caller should free() the + header when done, rather than the (optional) data->dptr. +*/ +struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb, + TDB_DATA key, TDB_DATA *data) +{ + TDB_DATA rec; + + rec = tdb_fetch(tdb, key); + if (rec.dsize < sizeof(struct ctdb_ltdb_header)) { + free(rec.dptr); + return NULL; + } + + if (data) { + data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header); + data->dptr = rec.dptr + sizeof(struct ctdb_ltdb_header); + } + return (struct ctdb_ltdb_header *)rec.dptr; +} + + +/* + write a record to a normal database +*/ +int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data) +{ + TDB_DATA rec; + int ret; + bool seqnum_suppressed = false; + + rec.dsize = sizeof(*header) + data.dsize; + rec.dptr = malloc(rec.dsize); + if (!rec.dptr) { + return -1; + } + + memcpy(rec.dptr, header, sizeof(*header)); + memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize); + + /* Databases with seqnum updates enabled only get their seqnum + changes when/if we modify the data */ + if (tdb_get_flags(tdb) & TDB_SEQNUM) { + TDB_DATA old; + old = tdb_fetch(tdb, key); + + if ( (old.dsize == rec.dsize) + && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header), + rec.dptr+sizeof(struct ctdb_ltdb_header), + rec.dsize-sizeof(struct ctdb_ltdb_header)) ) { + tdb_remove_flags(tdb, TDB_SEQNUM); + seqnum_suppressed = true; + } + free(old.dptr); + } + ret = tdb_store(tdb, key, rec, TDB_REPLACE); + if (seqnum_suppressed) { + tdb_add_flags(tdb, TDB_SEQNUM); + } + free(rec.dptr); + + return ret; +} diff --git a/libctdb/local_tdb.h b/libctdb/local_tdb.h new file mode 100644 index 00000000..549db7dc --- /dev/null +++ b/libctdb/local_tdb.h @@ -0,0 +1,10 @@ +#ifndef _LIBCTDB_LOCAL_TDB_H +#define _LIBCTDB_LOCAL_TDB_H + +struct ctdb_ltdb_header *ctdb_local_fetch(struct tdb_context *tdb, + TDB_DATA key, TDB_DATA *data); + +int ctdb_local_store(struct tdb_context *tdb, TDB_DATA key, + struct ctdb_ltdb_header *header, TDB_DATA data); + +#endif /* _LIBCTDB_LOCAL_TDB_H */ diff --git a/libctdb/messages.c b/libctdb/messages.c new file mode 100644 index 00000000..6ec35415 --- /dev/null +++ b/libctdb/messages.c @@ -0,0 +1,110 @@ +#include "libctdb_private.h" +#include "messages.h" +#include "io_elem.h" +#include +#include +#include +#include +#include + +struct message_handler_info { + struct message_handler_info *next, *prev; + /* Callback when we're first registered. */ + ctdb_set_message_handler_cb callback; + + uint64_t srvid; + ctdb_message_fn_t handler; + void *private_data; + struct ctdb_connection *ctdb; +}; + +void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr) +{ + struct message_handler_info *i; + struct ctdb_req_message *msg = (struct ctdb_req_message *)hdr; + TDB_DATA data; + + data.dptr = msg->data; + data.dsize = msg->datalen; + + for (i = ctdb->message_handlers; i; i = i->next) { + if (i->srvid == msg->srvid) { + i->handler(ctdb, msg->srvid, data, i->private_data); + } + } + /* FIXME: Report unknown messages */ +} + +static void set_message_handler(int status, struct message_handler_info *info) +{ + /* If registration failed, tell callback and clean up */ + if (status < 0) { + info->callback(status, info->private_data); + free(info); + return; + } else { + /* Put ourselves in list of handlers. */ + DLIST_ADD_END(info->ctdb->message_handlers, info, + struct message_handler_info); + /* Now call callback: it could remove us in theory. */ + info->callback(status, info->private_data); + } +} + +struct ctdb_request * +ctdb_set_message_handler_send(struct ctdb_connection *ctdb, uint64_t srvid, + ctdb_set_message_handler_cb callback, + ctdb_message_fn_t handler, void *private_data) +{ + struct ctdb_request *req; + struct message_handler_info *info; + + info = malloc(sizeof(*info)); + if (!info) { + return NULL; + } + req = new_ctdb_control_request(ctdb, CTDB_CONTROL_REGISTER_SRVID, + CTDB_CURRENT_NODE, NULL, 0); + if (!req) { + free(info); + return NULL; + } + req->hdr.control->srvid = srvid; + + info->srvid = srvid; + info->handler = handler; + info->callback = callback; + info->private_data = private_data; + info->ctdb = ctdb; + + req->callback.register_srvid = set_message_handler; + req->priv_data = info; + + return req; +} + +int ctdb_send_message(struct ctdb_connection *ctdb, + uint32_t pnn, uint64_t srvid, + TDB_DATA data) +{ + struct ctdb_request *req; + struct ctdb_req_message *pkt; + + req = new_ctdb_request(sizeof(*pkt) + data.dsize); + if (!req) { + return -1; + } + + io_elem_init_req_header(req->io, + CTDB_REQ_MESSAGE, pnn, new_reqid(ctdb)); + + /* There's no reply to this, so we mark it cancelled immediately. */ + req->cancelled = true; + + pkt = req->hdr.message; + pkt->srvid = srvid; + pkt->datalen = data.dsize; + memcpy(pkt->data, data.dptr, data.dsize); + DLIST_ADD_END(ctdb->outq, req, struct ctdb_request); + return 0; +} diff --git a/libctdb/messages.h b/libctdb/messages.h new file mode 100644 index 00000000..dcf19c8b --- /dev/null +++ b/libctdb/messages.h @@ -0,0 +1,8 @@ +#ifndef _LIBCTDB_MESSAGE_H +#define _LIBCTDB_MESSAGE_H +struct message_handler_info; +struct ctdb_connection; +struct ctdb_req_header; + +void deliver_message(struct ctdb_connection *ctdb, struct ctdb_req_header *hdr); +#endif /* _LIBCTDB_MESSAGE_H */ diff --git a/libctdb/sync.c b/libctdb/sync.c new file mode 100644 index 00000000..8c4892de --- /dev/null +++ b/libctdb/sync.c @@ -0,0 +1,111 @@ +/* + synchronous wrappers for libctdb + + Copyright (C) Rusty Russell 2010 + + This program is free software; you can redistribute it and/or modify + it under the terms of the GNU General Public License as published by + the Free Software Foundation; either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU General Public License for more details. + + You should have received a copy of the GNU General Public License + along with this program; if not, see . +*/ +#include +#include +#include +#include +#include + +/* FIXME: Find a way to share more code here. */ +struct ctdb_getrecmaster { + bool done; + int status; + uint32_t *recmaster; +}; + +static bool ctdb_service_flush(struct ctdb_connection *ctdb) +{ + struct pollfd fds; + + fds.fd = ctdb_get_fd(ctdb); + fds.events = ctdb_which_events(ctdb); + if (poll(&fds, 1, -1) < 0) { + /* Signalled is OK, other error is bad. */ + return errno == EINTR; + } + return ctdb_service(ctdb, fds.revents) >= 0; +} + +static void getrecmaster_done(int status, uint32_t recmaster, void *priv_data) +{ + struct ctdb_getrecmaster *grm = priv_data; + *grm->recmaster = recmaster; + grm->status = status; + grm->done = true; +} + +int ctdb_getrecmaster(struct ctdb_connection *ctdb, + uint32_t destnode, uint32_t *recmaster) +{ + struct ctdb_request *req; + struct ctdb_getrecmaster grm; + + grm.done = false; + grm.recmaster = recmaster; + req = ctdb_getrecmaster_send(ctdb, destnode, getrecmaster_done, &grm); + if (!req) + return -1; + + while (!grm.done) { + if (!ctdb_service_flush(ctdb)) { + ctdb_cancel(req); + return -1; + } + } + return grm.status; +} + +struct ctdb_attachdb { + bool done; + int status; + struct ctdb_db *ctdb_db; +}; + +static void attachdb_sync_done(int status, + struct ctdb_db *ctdb_db, void *private_data) +{ + struct ctdb_attachdb *atb = private_data; + atb->ctdb_db = ctdb_db; + atb->status = status; + atb->done = true; +} + +struct ctdb_db *ctdb_attachdb(struct ctdb_connection *ctdb, + const char *name, int persistent, + uint32_t tdb_flags) +{ + struct ctdb_request *req; + struct ctdb_attachdb atb; + + atb.done = false; + req = ctdb_attachdb_send(ctdb, name, persistent, tdb_flags, + attachdb_sync_done, &atb); + if (!req) + return NULL; + + while (!atb.done) { + if (!ctdb_service_flush(ctdb)) { + ctdb_cancel(req); + return NULL; + } + } + if (atb.status != 0) + return NULL; + return atb.ctdb_db; +} diff --git a/libctdb/tst.c b/libctdb/tst.c index 8b51f998..ab4b3c70 100644 --- a/libctdb/tst.c +++ b/libctdb/tst.c @@ -4,6 +4,7 @@ #include #include #include +#include #include "lib/tdb/include/tdb.h" #include "include/ctdb.h" @@ -12,9 +13,11 @@ void msg_h(struct ctdb_connection *ctdb, uint64_t srvid, TDB_DATA data, void *pr printf("Message received on port %d : %s\n", (int)srvid, data.dptr); } +static bool registered = false; void message_handler_cb(int status, void *private_data) { printf("Message handler registered: %i\n", status); + registered = true; } void rm_cb(int status, uint32_t recmaster, void *private_data) @@ -42,6 +45,11 @@ int main(int argc, char *argv[]) exit(10); } + /* Hack for testing: this makes sure registration goes out. */ + while (!registered) { + ctdb_service(ctdb_connection, POLLIN|POLLOUT); + } + msg.dptr="HelloWorld"; msg.dsize = strlen(msg.dptr);