r20889: import ctdb cluster backend from bzr
authorAndrew Tridgell <tridge@samba.org>
Fri, 19 Jan 2007 03:54:48 +0000 (03:54 +0000)
committerAndrew Tridgell <tridge@samba.org>
Fri, 19 Jan 2007 03:54:48 +0000 (03:54 +0000)
it will be interesting to see how the build farm handles this

17 files changed:
source/cluster/config.mk
source/cluster/ctdb/brlock_ctdb.c [new file with mode: 0644]
source/cluster/ctdb/common/ctdb.c [new file with mode: 0644]
source/cluster/ctdb/common/ctdb_call.c [new file with mode: 0644]
source/cluster/ctdb/common/ctdb_ltdb.c [new file with mode: 0644]
source/cluster/ctdb/common/ctdb_util.c [new file with mode: 0644]
source/cluster/ctdb/config.mk [new file with mode: 0644]
source/cluster/ctdb/ctdb_cluster.c [new file with mode: 0644]
source/cluster/ctdb/ctdb_cluster.h [new file with mode: 0644]
source/cluster/ctdb/include/ctdb.h [new file with mode: 0644]
source/cluster/ctdb/include/ctdb_private.h [new file with mode: 0644]
source/cluster/ctdb/tcp/ctdb_tcp.h [new file with mode: 0644]
source/cluster/ctdb/tcp/tcp_connect.c [new file with mode: 0644]
source/cluster/ctdb/tcp/tcp_init.c [new file with mode: 0644]
source/cluster/ctdb/tcp/tcp_io.c [new file with mode: 0644]
source/cluster/ctdb/tests/ctdb_bench.c [new file with mode: 0644]
source/cluster/ctdb/tests/ctdb_test.c [new file with mode: 0644]

index 934bc55252d7f9c46241df69ad851a11287b9d14..c5c2ea970acdc5033ee8fc28e3f8c703c01c6207 100644 (file)
@@ -1,4 +1,7 @@
+include ctdb/config.mk
 
 ####################
 [SUBSYSTEM::CLUSTER]
-OBJ_FILES = cluster.o
+OBJ_FILES = cluster.o \
+               local.o
+PRIVATE_DEPENDENCIES = ctdb
diff --git a/source/cluster/ctdb/brlock_ctdb.c b/source/cluster/ctdb/brlock_ctdb.c
new file mode 100644 (file)
index 0000000..bcfa566
--- /dev/null
@@ -0,0 +1,971 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   generic byte range locking code - ctdb backend
+
+   Copyright (C) Andrew Tridgell 2006
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "system/filesys.h"
+#include "lib/tdb/include/tdb.h"
+#include "messaging/messaging.h"
+#include "db_wrap.h"
+#include "lib/messaging/irpc.h"
+#include "libcli/libcli.h"
+#include "cluster/cluster.h"
+#include "ntvfs/common/brlock.h"
+#include "cluster/ctdb/include/ctdb.h"
+
+enum my_functions {FUNC_BRL_LOCK=1, FUNC_BRL_UNLOCK=2, 
+                  FUNC_BRL_REMOVE_PENDING=3, FUNC_BRL_LOCKTEST=4,
+                  FUNC_BRL_CLOSE=5};
+
+/*
+  in this module a "DATA_BLOB *file_key" is a blob that uniquely identifies
+  a file. For a local posix filesystem this will usually be a combination
+  of the device and inode numbers of the file, but it can be anything 
+  that uniquely idetifies a file for locking purposes, as long
+  as it is applied consistently.
+*/
+
+/* this struct is typically attached to tcon */
+struct brl_context {
+       struct ctdb_context *ctdb;
+       struct server_id server;
+       struct messaging_context *messaging_ctx;
+};
+
+/*
+  the lock context contains the elements that define whether one
+  lock is the same as another lock
+*/
+struct lock_context {
+       struct server_id server;
+       uint16_t smbpid;
+       struct brl_context *ctx;
+};
+
+/* The data in brlock records is an unsorted linear array of these
+   records.  It is unnecessary to store the count as tdb provides the
+   size of the record */
+struct lock_struct {
+       struct lock_context context;
+       struct ntvfs_handle *ntvfs;
+       uint64_t start;
+       uint64_t size;
+       enum brl_type lock_type;
+       void *notify_ptr;
+};
+
+/* this struct is attached to on open file handle */
+struct brl_handle {
+       DATA_BLOB key;
+       struct ntvfs_handle *ntvfs;
+       struct lock_struct last_lock;
+};
+
+/*
+  Open up the brlock.tdb database. Close it down using
+  talloc_free(). We need the messaging_ctx to allow for
+  pending lock notifications.
+*/
+static struct brl_context *brl_ctdb_init(TALLOC_CTX *mem_ctx, struct server_id server, 
+                                   struct messaging_context *messaging_ctx)
+{
+       struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+       struct brl_context *brl;
+
+       brl = talloc(mem_ctx, struct brl_context);
+       if (brl == NULL) {
+               return NULL;
+       }
+
+       brl->ctdb = ctdb;
+       brl->server = server;
+       brl->messaging_ctx = messaging_ctx;
+
+       DEBUG(0,("brl_ctdb_init: brl=%p\n", brl));
+
+       return brl;
+}
+
+static struct brl_handle *brl_ctdb_create_handle(TALLOC_CTX *mem_ctx, struct ntvfs_handle *ntvfs, 
+                                                   DATA_BLOB *file_key)
+{
+       struct brl_handle *brlh;
+
+       brlh = talloc(mem_ctx, struct brl_handle);
+       if (brlh == NULL) {
+               return NULL;
+       }
+
+       brlh->key = *file_key;
+       brlh->ntvfs = ntvfs;
+       ZERO_STRUCT(brlh->last_lock);
+
+       return brlh;
+}
+
+/*
+  see if two locking contexts are equal
+*/
+static BOOL brl_ctdb_same_context(struct lock_context *ctx1, struct lock_context *ctx2)
+{
+       return (cluster_id_equal(&ctx1->server, &ctx2->server) &&
+               ctx1->smbpid == ctx2->smbpid &&
+               ctx1->ctx == ctx2->ctx);
+}
+
+/*
+  see if lck1 and lck2 overlap
+*/
+static BOOL brl_ctdb_overlap(struct lock_struct *lck1, 
+                       struct lock_struct *lck2)
+{
+       /* this extra check is not redundent - it copes with locks
+          that go beyond the end of 64 bit file space */
+       if (lck1->size != 0 &&
+           lck1->start == lck2->start &&
+           lck1->size == lck2->size) {
+               return True;
+       }
+           
+       if (lck1->start >= (lck2->start+lck2->size) ||
+           lck2->start >= (lck1->start+lck1->size)) {
+               return False;
+       }
+       return True;
+} 
+
+/*
+ See if lock2 can be added when lock1 is in place.
+*/
+static BOOL brl_ctdb_conflict(struct lock_struct *lck1, 
+                        struct lock_struct *lck2)
+{
+       /* pending locks don't conflict with anything */
+       if (lck1->lock_type >= PENDING_READ_LOCK ||
+           lck2->lock_type >= PENDING_READ_LOCK) {
+               return False;
+       }
+
+       if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) {
+               return False;
+       }
+
+       if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
+           lck2->lock_type == READ_LOCK && lck1->ntvfs == lck2->ntvfs) {
+               return False;
+       }
+
+       return brl_ctdb_overlap(lck1, lck2);
+} 
+
+
+/*
+ Check to see if this lock conflicts, but ignore our own locks on the
+ same fnum only.
+*/
+static BOOL brl_ctdb_conflict_other(struct lock_struct *lck1, struct lock_struct *lck2)
+{
+       /* pending locks don't conflict with anything */
+       if (lck1->lock_type >= PENDING_READ_LOCK ||
+           lck2->lock_type >= PENDING_READ_LOCK) {
+               return False;
+       }
+
+       if (lck1->lock_type == READ_LOCK && lck2->lock_type == READ_LOCK) 
+               return False;
+
+       /*
+        * note that incoming write calls conflict with existing READ
+        * locks even if the context is the same. JRA. See LOCKTEST7
+        * in smbtorture.
+        */
+       if (brl_ctdb_same_context(&lck1->context, &lck2->context) &&
+           lck1->ntvfs == lck2->ntvfs &&
+           (lck2->lock_type == READ_LOCK || lck1->lock_type == WRITE_LOCK)) {
+               return False;
+       }
+
+       return brl_ctdb_overlap(lck1, lck2);
+} 
+
+
+/*
+  amazingly enough, w2k3 "remembers" whether the last lock failure
+  is the same as this one and changes its error code. I wonder if any
+  app depends on this?
+*/
+static NTSTATUS brl_ctdb_lock_failed(struct brl_handle *brlh, struct lock_struct *lock)
+{
+       /*
+        * this function is only called for non pending lock!
+        */
+
+       /* 
+        * if the notify_ptr is non NULL,
+        * it means that we're at the end of a pending lock
+        * and the real lock is requested after the timeout went by
+        * In this case we need to remember the last_lock and always
+        * give FILE_LOCK_CONFLICT
+        */
+       if (lock->notify_ptr) {
+               brlh->last_lock = *lock;
+               return NT_STATUS_FILE_LOCK_CONFLICT;
+       }
+
+       /* 
+        * amazing the little things you learn with a test
+        * suite. Locks beyond this offset (as a 64 bit
+        * number!) always generate the conflict error code,
+        * unless the top bit is set
+        */
+       if (lock->start >= 0xEF000000 && (lock->start >> 63) == 0) {
+               brlh->last_lock = *lock;
+               return NT_STATUS_FILE_LOCK_CONFLICT;
+       }
+
+       /*
+        * if the current lock matches the last failed lock on the file handle
+        * and starts at the same offset, then FILE_LOCK_CONFLICT should be returned
+        */
+       if (cluster_id_equal(&lock->context.server, &brlh->last_lock.context.server) &&
+           lock->context.ctx == brlh->last_lock.context.ctx &&
+           lock->ntvfs == brlh->last_lock.ntvfs &&
+           lock->start == brlh->last_lock.start) {
+               return NT_STATUS_FILE_LOCK_CONFLICT;
+       }
+
+       brlh->last_lock = *lock;
+       return NT_STATUS_LOCK_NOT_GRANTED;
+}
+
+
+static void show_locks(const char *op, struct lock_struct *locks, int count)
+{
+       int i;
+       DEBUG(0,("OP: %s\n", op));
+       for (i=0;i<count;i++) {
+               DEBUG(0,("%2d: %4d %4d %d.%d.%d %p %p\n",
+                        i, (int)locks[i].start, (int)locks[i].size, 
+                        locks[i].context.server.node,
+                        locks[i].context.server.id,
+                        locks[i].context.smbpid,
+                        locks[i].context.ctx,
+                        locks[i].ntvfs));
+       }
+}
+
+
+struct ctdb_lock_req {
+       uint16_t smbpid;
+       uint64_t start;
+       uint64_t size;
+       enum brl_type lock_type;
+       void *notify_ptr;
+       struct server_id server;
+       struct brl_context *brl;
+       struct ntvfs_handle *ntvfs;
+};
+
+/*
+  ctdb call handling brl_lock()
+*/
+static int brl_ctdb_lock_func(struct ctdb_call *call)
+{
+       struct ctdb_lock_req *req = (struct ctdb_lock_req *)call->call_data->dptr;
+       TDB_DATA dbuf;
+       int count=0, i;
+       struct lock_struct lock, *locks=NULL;
+       NTSTATUS status = NT_STATUS_OK;
+
+#if 0
+       /* if this is a pending lock, then with the chainlock held we
+          try to get the real lock. If we succeed then we don't need
+          to make it pending. This prevents a possible race condition
+          where the pending lock gets created after the lock that is
+          preventing the real lock gets removed */
+       if (lock_type >= PENDING_READ_LOCK) {
+               enum brl_type rw = (lock_type==PENDING_READ_LOCK? READ_LOCK : WRITE_LOCK);
+
+               /* here we need to force that the last_lock isn't overwritten */
+               lock = brlh->last_lock;
+               status = brl_ctdb_lock(brl, brlh, smbpid, start, size, rw, NULL);
+               brlh->last_lock = lock;
+
+               if (NT_STATUS_IS_OK(status)) {
+                       tdb_chainunlock(brl->w->tdb, kbuf);
+                       return NT_STATUS_OK;
+               }
+       }
+#endif
+
+       dbuf = call->record_data;
+
+       ZERO_STRUCT(lock);
+       lock.context.smbpid = req->smbpid;
+       lock.context.server = req->server;
+       lock.context.ctx = req->brl;
+       lock.ntvfs = req->ntvfs;
+       lock.start = req->start;
+       lock.size = req->size;
+       lock.lock_type = req->lock_type;
+       lock.notify_ptr = req->notify_ptr;
+
+       {
+               int xlen = sizeof(lock);
+               uint8_t *xx = &lock;
+               int ii, fd = open("/dev/null", O_WRONLY);
+               for (ii=0;ii<xlen;ii++) {
+                       write(fd, &xx[ii], 1);
+               }
+               close(fd);
+       }
+
+       if (dbuf.dptr) {
+               /* there are existing locks - make sure they don't conflict */
+               locks = (struct lock_struct *)dbuf.dptr;
+               count = dbuf.dsize / sizeof(*locks);
+
+               show_locks("lock", locks, count);
+
+               for (i=0; i<count; i++) {
+                       if (brl_ctdb_conflict(&locks[i], &lock)) {
+                               status = NT_STATUS_LOCK_NOT_GRANTED;
+                               goto reply;
+                       }
+               }
+       }
+
+       call->new_data = talloc(call, TDB_DATA);
+       if (call->new_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->new_data->dptr = talloc_size(call, dbuf.dsize + sizeof(lock));
+       if (call->new_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       memcpy(call->new_data->dptr, locks, dbuf.dsize);
+       memcpy(call->new_data->dptr+dbuf.dsize, &lock, sizeof(lock));
+       call->new_data->dsize = dbuf.dsize + sizeof(lock);
+
+       if (req->lock_type >= PENDING_READ_LOCK) {
+               status = NT_STATUS_LOCK_NOT_GRANTED;
+       }
+
+       DEBUG(0,("lock: size now %d\n", call->new_data->dsize));
+
+reply:
+       call->reply_data = talloc(call, TDB_DATA);
+       if (call->reply_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+       call->reply_data->dsize = sizeof(NTSTATUS);
+       if (call->reply_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       *(NTSTATUS *)call->reply_data->dptr = status;
+
+       return 0;
+}
+
+
+/*
+  Lock a range of bytes.  The lock_type can be a PENDING_*_LOCK, in
+  which case a real lock is first tried, and if that fails then a
+  pending lock is created. When the pending lock is triggered (by
+  someone else closing an overlapping lock range) a messaging
+  notification is sent, identified by the notify_ptr
+*/
+static NTSTATUS brl_ctdb_lock(struct brl_context *brl,
+                             struct brl_handle *brlh,
+                             uint16_t smbpid,
+                             uint64_t start, uint64_t size, 
+                             enum brl_type lock_type,
+                             void *notify_ptr)
+{
+       TDB_DATA kbuf, rbuf, sbuf;
+       struct ctdb_lock_req req;
+       NTSTATUS status;
+       int ret;
+
+       kbuf.dptr = brlh->key.data;
+       kbuf.dsize = brlh->key.length;
+
+       rbuf.dptr = (uint8_t *)&req;
+       rbuf.dsize = sizeof(req);
+
+       ZERO_STRUCT(req);
+       req.smbpid = smbpid;
+       req.start  = start;
+       req.size   = size;
+       req.lock_type = lock_type;
+       req.notify_ptr = notify_ptr;
+       req.server = brl->server;
+       req.brl = brl;
+       req.ntvfs = brlh->ntvfs;
+               
+       ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCK, &rbuf, &sbuf);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_call failed - %s\n", __location__));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+       status = *(NTSTATUS *)sbuf.dptr;
+       talloc_free(sbuf.dptr);
+
+       return status;
+}
+
+#if 0
+/*
+  we are removing a lock that might be holding up a pending lock. Scan for pending
+  locks that cover this range and if we find any then notify the server that it should
+  retry the lock
+*/
+static void brl_ctdb_notify_unlock(struct brl_context *brl,
+                             struct lock_struct *locks, int count, 
+                             struct lock_struct *removed_lock)
+{
+       int i, last_notice;
+
+       /* the last_notice logic is to prevent stampeding on a lock
+          range. It prevents us sending hundreds of notifies on the
+          same range of bytes. It doesn't prevent all possible
+          stampedes, but it does prevent the most common problem */
+       last_notice = -1;
+
+       for (i=0;i<count;i++) {
+               if (locks[i].lock_type >= PENDING_READ_LOCK &&
+                   brl_ctdb_overlap(&locks[i], removed_lock)) {
+                       if (last_notice != -1 && brl_ctdb_overlap(&locks[i], &locks[last_notice])) {
+                               continue;
+                       }
+                       if (locks[i].lock_type == PENDING_WRITE_LOCK) {
+                               last_notice = i;
+                       }
+                       messaging_send_ptr(brl->messaging_ctx, locks[i].context.server, 
+                                          MSG_BRL_RETRY, locks[i].notify_ptr);
+               }
+       }
+}
+#endif
+
+/*
+  send notifications for all pending locks - the file is being closed by this
+  user
+*/
+static void brl_ctdb_notify_all(struct brl_context *brl,
+                          struct lock_struct *locks, int count)
+{
+       int i;
+       for (i=0;i<count;i++) {
+               if (locks->lock_type >= PENDING_READ_LOCK) {
+//                     brl_ctdb_notify_unlock(brl, locks, count, &locks[i]);
+               }
+       }
+}
+
+struct ctdb_unlock_req {
+       uint16_t smbpid;
+       uint64_t start;
+       uint64_t size;
+       struct server_id server;
+       struct brl_context *brl;
+       struct ntvfs_handle *ntvfs;
+};
+
+/*
+ Unlock a range of bytes.
+*/
+static int brl_ctdb_unlock_func(struct ctdb_call *call)
+{
+       struct ctdb_unlock_req *req = (struct ctdb_unlock_req *)call->call_data->dptr;
+       TDB_DATA dbuf;
+       int count, i;
+       struct lock_struct *locks;
+       struct lock_context context;
+       NTSTATUS status = NT_STATUS_OK;
+
+       dbuf = call->record_data;
+
+       context.smbpid = req->smbpid;
+       context.server = req->server;
+       context.ctx = req->brl;
+
+       /* there are existing locks - find a match */
+       locks = (struct lock_struct *)dbuf.dptr;
+       count = dbuf.dsize / sizeof(*locks);
+
+       show_locks("unlock", locks, count);
+
+       for (i=0; i<count; i++) {
+               struct lock_struct *lock = &locks[i];
+               
+               if (brl_ctdb_same_context(&lock->context, &context) &&
+                   lock->ntvfs == req->ntvfs &&
+                   lock->start == req->start &&
+                   lock->size == req->size &&
+                   lock->lock_type < PENDING_READ_LOCK) {
+//                     struct lock_struct removed_lock = *lock;
+
+                       call->new_data = talloc(call, TDB_DATA);
+                       if (call->new_data == NULL) {
+                               return CTDB_ERR_NOMEM;
+                       }
+
+                       call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock));
+                       if (call->new_data->dptr == NULL) {
+                               return CTDB_ERR_NOMEM;
+                       }
+                       call->new_data->dsize = dbuf.dsize - sizeof(lock);
+
+                       memcpy(call->new_data->dptr, locks, i*sizeof(lock));
+                       memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1,
+                              (count-(i+1))*sizeof(lock));
+                       
+                       if (count > 1) {
+                               /* send notifications for any relevant pending locks */
+//                             brl_ctdb_notify_unlock(req->brl, locks, count, &removed_lock);
+                       }
+                       break;
+               }
+       }
+
+       if (call->new_data) {
+               DEBUG(0,("unlock: size now %d\n", call->new_data->dsize));
+       }
+       
+       if (i == count) {
+               /* we didn't find it */
+               status = NT_STATUS_RANGE_NOT_LOCKED;
+       }
+
+       call->reply_data = talloc(call, TDB_DATA);
+       if (call->reply_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+       call->reply_data->dsize = sizeof(NTSTATUS);
+       if (call->reply_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       *(NTSTATUS *)call->reply_data->dptr = status;
+
+       return 0;
+}
+
+
+/*
+ Unlock a range of bytes.
+*/
+static NTSTATUS brl_ctdb_unlock(struct brl_context *brl,
+                               struct brl_handle *brlh, 
+                               uint16_t smbpid,
+                               uint64_t start, uint64_t size)
+{
+       TDB_DATA kbuf, rbuf, sbuf;
+       struct ctdb_unlock_req req;
+       NTSTATUS status;
+       int ret;
+
+       kbuf.dptr = brlh->key.data;
+       kbuf.dsize = brlh->key.length;
+
+       rbuf.dptr = (uint8_t *)&req;
+       rbuf.dsize = sizeof(req);
+
+       ZERO_STRUCT(req);
+       req.smbpid = smbpid;
+       req.start  = start;
+       req.size   = size;
+       req.server = brl->server;
+       req.brl = brl;
+       req.ntvfs = brlh->ntvfs;
+               
+       ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_UNLOCK, &rbuf, &sbuf);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_call failed - %s\n", __location__));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+       status = *(NTSTATUS *)sbuf.dptr;
+       talloc_free(sbuf.dptr);
+
+       return status;
+}
+
+
+struct ctdb_remove_pending_req {
+       struct server_id server;
+       void *notify_ptr;
+};
+
+/*
+  remove a pending lock. This is called when the caller has either
+  given up trying to establish a lock or when they have succeeded in
+  getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_remove_pending_func(struct ctdb_call *call)
+{
+       struct ctdb_remove_pending_req *req = (struct ctdb_remove_pending_req *)call->call_data->dptr;
+       TDB_DATA dbuf;
+       int count, i;
+       struct lock_struct *locks;
+       NTSTATUS status = NT_STATUS_OK;
+
+       dbuf = call->record_data;
+
+       /* there are existing locks - find a match */
+       locks = (struct lock_struct *)dbuf.dptr;
+       count = dbuf.dsize / sizeof(*locks);
+
+       show_locks("remove_pending", locks, count);
+
+       for (i=0; i<count; i++) {
+               struct lock_struct *lock = &locks[i];
+               
+               if (lock->lock_type >= PENDING_READ_LOCK &&
+                   lock->notify_ptr == req->notify_ptr &&
+                   cluster_id_equal(&lock->context.server, &req->server)) {
+                       call->new_data = talloc(call, TDB_DATA);
+                       if (call->new_data == NULL) {
+                               return CTDB_ERR_NOMEM;
+                       }
+
+                       call->new_data->dptr = talloc_size(call, dbuf.dsize - sizeof(lock));
+                       if (call->new_data->dptr == NULL) {
+                               return CTDB_ERR_NOMEM;
+                       }
+                       call->new_data->dsize = dbuf.dsize - sizeof(lock);
+
+                       memcpy(call->new_data->dptr, locks, i*sizeof(lock));
+                       memcpy(call->new_data->dptr+i*sizeof(lock), locks+i+1,
+                              (count-(i+1))*sizeof(lock));
+                       break;
+               }
+       }
+       
+       if (call->new_data) {
+               DEBUG(0,("remove_pending: size now %d\n", call->new_data->dsize));
+       }
+
+       if (i == count) {
+               /* we didn't find it */
+               status = NT_STATUS_RANGE_NOT_LOCKED;
+       }
+
+       call->reply_data = talloc(call, TDB_DATA);
+       if (call->reply_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+       call->reply_data->dsize = sizeof(NTSTATUS);
+       if (call->reply_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       *(NTSTATUS *)call->reply_data->dptr = status;
+
+       return 0;
+}
+
+static NTSTATUS brl_ctdb_remove_pending(struct brl_context *brl,
+                                       struct brl_handle *brlh, 
+                                       void *notify_ptr)
+{
+       TDB_DATA kbuf, rbuf, sbuf;
+       struct ctdb_remove_pending_req req;
+       NTSTATUS status;
+       int ret;
+
+       kbuf.dptr = brlh->key.data;
+       kbuf.dsize = brlh->key.length;
+
+       rbuf.dptr = (uint8_t *)&req;
+       rbuf.dsize = sizeof(req);
+
+       ZERO_STRUCT(req);
+       req.notify_ptr = notify_ptr;
+       req.server = brl->server;
+               
+       ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_REMOVE_PENDING, &rbuf, &sbuf);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_call failed - %s\n", __location__));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+       status = *(NTSTATUS *)sbuf.dptr;
+       talloc_free(sbuf.dptr);
+
+       return status;
+}
+
+
+struct ctdb_locktest_req {
+       uint16_t smbpid;
+       uint64_t start;
+       uint64_t size;
+       enum brl_type lock_type;
+       struct brl_context *brl;
+       struct server_id server;
+       struct ntvfs_handle *ntvfs;
+};
+
+/*
+  remove a pending lock. This is called when the caller has either
+  given up trying to establish a lock or when they have succeeded in
+  getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_locktest_func(struct ctdb_call *call)
+{
+       struct ctdb_locktest_req *req = (struct ctdb_locktest_req *)call->call_data->dptr;
+       TDB_DATA dbuf;
+       int count, i;
+       struct lock_struct *locks, lock;
+       NTSTATUS status = NT_STATUS_OK;
+
+       lock.context.smbpid = req->smbpid;
+       lock.context.server = req->server;
+       lock.context.ctx = req->brl;
+       lock.ntvfs = req->ntvfs;
+       lock.start = req->start;
+       lock.size = req->size;
+       lock.lock_type = req->lock_type;
+
+       dbuf = call->record_data;
+
+       /* there are existing locks - find a match */
+       locks = (struct lock_struct *)dbuf.dptr;
+       count = dbuf.dsize / sizeof(*locks);
+
+       show_locks("locktest", locks, count);
+
+       for (i=0; i<count; i++) {
+               if (brl_ctdb_conflict_other(&locks[i], &lock)) {
+                       status = NT_STATUS_FILE_LOCK_CONFLICT;
+                       break;
+               }
+       }
+       
+       call->reply_data = talloc(call, TDB_DATA);
+       if (call->reply_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+       call->reply_data->dsize = sizeof(NTSTATUS);
+       if (call->reply_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       *(NTSTATUS *)call->reply_data->dptr = status;
+
+       return 0;
+}
+
+/*
+  Test if we are allowed to perform IO on a region of an open file
+*/
+static NTSTATUS brl_ctdb_locktest(struct brl_context *brl,
+                                 struct brl_handle *brlh,
+                                 uint16_t smbpid, 
+                                 uint64_t start, uint64_t size, 
+                                 enum brl_type lock_type)
+{
+       TDB_DATA kbuf, rbuf, sbuf;
+       struct ctdb_locktest_req req;
+       NTSTATUS status;
+       int ret;
+
+       kbuf.dptr = brlh->key.data;
+       kbuf.dsize = brlh->key.length;
+
+       rbuf.dptr = (uint8_t *)&req;
+       rbuf.dsize = sizeof(req);
+
+       ZERO_STRUCT(req);
+       req.smbpid = smbpid;
+       req.start  = start;
+       req.size   = size;
+       req.lock_type = lock_type;
+       req.server = brl->server;
+       req.brl = brl;
+       req.ntvfs = brlh->ntvfs;
+
+       ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_LOCKTEST, &rbuf, &sbuf);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_call failed - %s\n", __location__));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+       status = *(NTSTATUS *)sbuf.dptr;
+       talloc_free(sbuf.dptr);
+
+       return status;
+}
+
+
+struct ctdb_close_req {
+       struct brl_context *brl;
+       struct server_id server;
+       struct ntvfs_handle *ntvfs;
+};
+
+/*
+  remove a pending lock. This is called when the caller has either
+  given up trying to establish a lock or when they have succeeded in
+  getting it. In either case they no longer need to be notified.
+*/
+static int brl_ctdb_close_func(struct ctdb_call *call)
+{
+       struct ctdb_close_req *req = (struct ctdb_close_req *)call->call_data->dptr;
+       TDB_DATA dbuf;
+       int count, dcount=0, i;
+       struct lock_struct *locks;
+       NTSTATUS status = NT_STATUS_OK;
+
+       dbuf = call->record_data;
+
+       /* there are existing locks - find a match */
+       locks = (struct lock_struct *)dbuf.dptr;
+       count = dbuf.dsize / sizeof(*locks);
+
+       show_locks("close", locks, count);
+
+       DEBUG(0,("closing ctx=%p server=%d.%d ntvfs=%p\n",
+                req->brl, req->server.node, req->server.id, req->ntvfs));
+
+       for (i=0; i<count; i++) {
+               struct lock_struct *lock = &locks[i];
+
+               if (lock->context.ctx == req->brl &&
+                   cluster_id_equal(&lock->context.server, &req->server) &&
+                   lock->ntvfs == req->ntvfs) {
+                       /* found it - delete it */
+                       if (count > 1 && i < count-1) {
+                               memmove(&locks[i], &locks[i+1], 
+                                       sizeof(*locks)*((count-1) - i));
+                       }
+                       count--;
+                       i--;
+                       dcount++;
+               }
+       }
+
+       if (dcount > 0) {
+               call->new_data = talloc(call, TDB_DATA);
+               if (call->new_data == NULL) {
+                       return CTDB_ERR_NOMEM;
+               }
+               
+               call->new_data->dptr = talloc_size(call, count*sizeof(struct lock_struct));
+               if (call->new_data->dptr == NULL) {
+                       return CTDB_ERR_NOMEM;
+               }
+               call->new_data->dsize = count*sizeof(struct lock_struct);
+
+               memcpy(call->new_data->dptr, locks, count*sizeof(struct lock_struct));
+       }
+
+       if (call->new_data) {
+               DEBUG(0,("close: size now %d\n", call->new_data->dsize));
+       }
+
+       DEBUG(0,("brl_ctdb_close_func dcount=%d count=%d\n", dcount, count));
+       
+       call->reply_data = talloc(call, TDB_DATA);
+       if (call->reply_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+
+       call->reply_data->dptr = talloc_size(call, sizeof(NTSTATUS));
+       call->reply_data->dsize = sizeof(NTSTATUS);
+       if (call->reply_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       *(NTSTATUS *)call->reply_data->dptr = status;
+
+       return 0;
+}
+
+/*
+  Test if we are allowed to perform IO on a region of an open file
+*/
+static NTSTATUS brl_ctdb_close(struct brl_context *brl,
+                              struct brl_handle *brlh)
+{
+       TDB_DATA kbuf, rbuf, sbuf;
+       struct ctdb_close_req req;
+       NTSTATUS status;
+       int ret;
+
+       kbuf.dptr = brlh->key.data;
+       kbuf.dsize = brlh->key.length;
+
+       rbuf.dptr = (uint8_t *)&req;
+       rbuf.dsize = sizeof(req);
+
+       ZERO_STRUCT(req);
+       req.brl = brl;
+       req.server = brl->server;
+       req.ntvfs = brlh->ntvfs;
+
+       DEBUG(0,("brl_ctdb_close %u.%u %p\n", req.server.node, req.server.id, brl));
+
+       ret = ctdb_call(brl->ctdb, kbuf, FUNC_BRL_CLOSE, &rbuf, &sbuf);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_call failed - %s\n", __location__));
+               return NT_STATUS_INTERNAL_DB_CORRUPTION;
+       }
+
+       status = *(NTSTATUS *)sbuf.dptr;
+       talloc_free(sbuf.dptr);
+
+       return status;
+}
+
+
+static const struct brlock_ops brlock_tdb_ops = {
+       .brl_init           = brl_ctdb_init,
+       .brl_create_handle  = brl_ctdb_create_handle,
+       .brl_lock           = brl_ctdb_lock,
+       .brl_unlock         = brl_ctdb_unlock,
+       .brl_remove_pending = brl_ctdb_remove_pending,
+       .brl_locktest       = brl_ctdb_locktest,
+       .brl_close          = brl_ctdb_close
+};
+
+
+void brl_ctdb_init_ops(void)
+{
+       struct ctdb_context *ctdb = talloc_get_type(cluster_private(), struct ctdb_context);
+
+       brl_set_ops(&brlock_tdb_ops);
+
+       ctdb_set_call(ctdb, brl_ctdb_lock_func,  FUNC_BRL_LOCK);
+       ctdb_set_call(ctdb, brl_ctdb_unlock_func,  FUNC_BRL_UNLOCK);
+       ctdb_set_call(ctdb, brl_ctdb_remove_pending_func,  FUNC_BRL_REMOVE_PENDING);
+       ctdb_set_call(ctdb, brl_ctdb_locktest_func,  FUNC_BRL_LOCKTEST);
+       ctdb_set_call(ctdb, brl_ctdb_close_func,  FUNC_BRL_CLOSE);
+
+}
diff --git a/source/cluster/ctdb/common/ctdb.c b/source/cluster/ctdb/common/ctdb.c
new file mode 100644 (file)
index 0000000..ad0345b
--- /dev/null
@@ -0,0 +1,287 @@
+/* 
+   ctdb main protocol code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+  choose the transport we will use
+*/
+int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport)
+{
+       int ctdb_tcp_init(struct ctdb_context *ctdb);
+
+       if (strcmp(transport, "tcp") == 0) {
+               return ctdb_tcp_init(ctdb);
+       }
+       ctdb_set_error(ctdb, "Unknown transport '%s'\n", transport);
+       return -1;
+}
+
+/*
+  set some ctdb flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags)
+{
+       ctdb->flags |= flags;
+}
+
+
+/*
+  add a node to the list of active nodes
+*/
+static int ctdb_add_node(struct ctdb_context *ctdb, char *nstr)
+{
+       struct ctdb_node *node, **nodep;
+
+       nodep = talloc_realloc(ctdb, ctdb->nodes, struct ctdb_node *, ctdb->num_nodes+1);
+       CTDB_NO_MEMORY(ctdb, nodep);
+
+       ctdb->nodes = nodep;
+       nodep = &ctdb->nodes[ctdb->num_nodes];
+       (*nodep) = talloc_zero(ctdb->nodes, struct ctdb_node);
+       CTDB_NO_MEMORY(ctdb, *nodep);
+       node = *nodep;
+
+       if (ctdb_parse_address(ctdb, node, nstr, &node->address) != 0) {
+               return -1;
+       }
+       node->ctdb = ctdb;
+       node->name = talloc_asprintf(node, "%s:%u", 
+                                    node->address.address, 
+                                    node->address.port);
+       /* for now we just set the vnn to the line in the file - this
+          will change! */
+       node->vnn = ctdb->num_nodes;
+
+       if (ctdb->methods->add_node(node) != 0) {
+               talloc_free(node);
+               return -1;
+       }
+
+       if (ctdb_same_address(&ctdb->address, &node->address)) {
+               ctdb->vnn = node->vnn;
+       }
+
+       ctdb->num_nodes++;
+
+       return 0;
+}
+
+/*
+  setup the node list from a file
+*/
+int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist)
+{
+       char **lines;
+       int nlines;
+       int i;
+
+       lines = file_lines_load(nlist, &nlines, ctdb);
+       if (lines == NULL) {
+               ctdb_set_error(ctdb, "Failed to load nlist '%s'\n", nlist);
+               return -1;
+       }
+
+       for (i=0;i<nlines;i++) {
+               if (ctdb_add_node(ctdb, lines[i]) != 0) {
+                       talloc_free(lines);
+                       return -1;
+               }
+       }
+       
+       talloc_free(lines);
+       return 0;
+}
+
+/*
+  setup the local node address
+*/
+int ctdb_set_address(struct ctdb_context *ctdb, const char *address)
+{
+       if (ctdb_parse_address(ctdb, ctdb, address, &ctdb->address) != 0) {
+               return -1;
+       }
+       
+       ctdb->name = talloc_asprintf(ctdb, "%s:%u", 
+                                    ctdb->address.address, 
+                                    ctdb->address.port);
+       return 0;
+}
+
+/*
+  add a node to the list of active nodes
+*/
+int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id)
+{
+       struct ctdb_registered_call *call;
+
+       call = talloc(ctdb, struct ctdb_registered_call);
+       call->fn = fn;
+       call->id = id;
+
+       DLIST_ADD(ctdb->calls, call);   
+       return 0;
+}
+
+/*
+  return the vnn of this node
+*/
+uint32_t ctdb_get_vnn(struct ctdb_context *ctdb)
+{
+       return ctdb->vnn;
+}
+
+/*
+  start the protocol going
+*/
+int ctdb_start(struct ctdb_context *ctdb)
+{
+       return ctdb->methods->start(ctdb);
+}
+
+/*
+  called by the transport layer when a packet comes in
+*/
+static void ctdb_recv_pkt(struct ctdb_context *ctdb, uint8_t *data, uint32_t length)
+{
+       struct ctdb_req_header *hdr;
+       if (length < sizeof(*hdr)) {
+               ctdb_set_error(ctdb, "Bad packet length %d\n", length);
+               return;
+       }
+       hdr = (struct ctdb_req_header *)data;
+       if (length != hdr->length) {
+               ctdb_set_error(ctdb, "Bad header length %d expected %d\n", 
+                              hdr->length, length);
+               return;
+       }
+
+       DEBUG(0,("got ctdb op %d reqid %d\n", hdr->operation, hdr->reqid));
+
+       switch (hdr->operation) {
+       case CTDB_REQ_CALL:
+               ctdb_request_call(ctdb, hdr);
+               break;
+
+       case CTDB_REPLY_CALL:
+               ctdb_reply_call(ctdb, hdr);
+               break;
+
+       case CTDB_REPLY_ERROR:
+               ctdb_reply_error(ctdb, hdr);
+               break;
+
+       case CTDB_REPLY_REDIRECT:
+               ctdb_reply_redirect(ctdb, hdr);
+               break;
+
+       case CTDB_REQ_DMASTER:
+               ctdb_request_dmaster(ctdb, hdr);
+               break;
+
+       case CTDB_REPLY_DMASTER:
+               ctdb_reply_dmaster(ctdb, hdr);
+               break;
+
+       default:
+               printf("Packet with unknown operation %d\n", hdr->operation);
+               talloc_free(hdr);
+               break;
+       }
+}
+
+/*
+  called by the transport layer when a node is dead
+*/
+static void ctdb_node_dead(struct ctdb_node *node)
+{
+       node->ctdb->num_connected--;
+       printf("%s: node %s is dead: %d connected\n", 
+              node->ctdb->name, node->name, node->ctdb->num_connected);
+}
+
+/*
+  called by the transport layer when a node is dead
+*/
+static void ctdb_node_connected(struct ctdb_node *node)
+{
+       node->ctdb->num_connected++;
+       printf("%s: connected to %s - %d connected\n", 
+              node->ctdb->name, node->name, node->ctdb->num_connected);
+}
+
+/*
+  wait for all nodes to be connected
+*/
+void ctdb_connect_wait(struct ctdb_context *ctdb)
+{
+       int expected = ctdb->num_nodes - 1;
+       if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+               expected++;
+       }
+       while (ctdb->num_connected != expected) {
+               event_loop_once(ctdb->ev);
+       }
+}
+
+/*
+  wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb)
+{
+       int expected = 0;
+       if (ctdb->flags & CTDB_FLAG_SELF_CONNECT) {
+               expected++;
+       }
+       while (ctdb->num_connected > expected) {
+               event_loop_once(ctdb->ev);
+       }
+}
+
+static const struct ctdb_upcalls ctdb_upcalls = {
+       .recv_pkt       = ctdb_recv_pkt,
+       .node_dead      = ctdb_node_dead,
+       .node_connected = ctdb_node_connected
+};
+
+/*
+  initialise the ctdb daemon. 
+
+  NOTE: In current code the daemon does not fork. This is for testing purposes only
+  and to simplify the code.
+*/
+struct ctdb_context *ctdb_init(struct event_context *ev)
+{
+       struct ctdb_context *ctdb;
+
+       ctdb = talloc_zero(ev, struct ctdb_context);
+       ctdb->ev = ev;
+       ctdb->upcalls = &ctdb_upcalls;
+       ctdb->idr = idr_init(ctdb);
+
+       return ctdb;
+}
+
diff --git a/source/cluster/ctdb/common/ctdb_call.c b/source/cluster/ctdb/common/ctdb_call.c
new file mode 100644 (file)
index 0000000..2bedccc
--- /dev/null
@@ -0,0 +1,653 @@
+/* 
+   ctdb_call protocol code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+/*
+  see http://wiki.samba.org/index.php/Samba_%26_Clustering for
+  protocol design and packet details
+*/
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+
+/*
+  queue a packet or die
+*/
+static void ctdb_queue_packet(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_node *node;
+       DEBUG(0,("queueing destnode=%u srcnode=%u\n", hdr->destnode, hdr->srcnode));
+       node = ctdb->nodes[hdr->destnode];
+       if (ctdb->methods->queue_pkt(node, (uint8_t *)hdr, hdr->length) != 0) {
+               ctdb_fatal(ctdb, "Unable to queue packet\n");
+       }
+}
+
+
+/*
+  local version of ctdb_call
+*/
+static int ctdb_call_local(struct ctdb_context *ctdb, TDB_DATA key, 
+                          struct ctdb_ltdb_header *header, TDB_DATA *data,
+                          int call_id, TDB_DATA *call_data, TDB_DATA *reply_data,
+                          uint32_t caller)
+{
+       struct ctdb_call *c;
+       struct ctdb_registered_call *fn;
+
+       c = talloc(ctdb, struct ctdb_call);
+       CTDB_NO_MEMORY(ctdb, c);
+
+       c->key = key;
+       c->call_data = call_data;
+       c->record_data.dptr = talloc_memdup(c, data->dptr, data->dsize);
+       c->record_data.dsize = data->dsize;
+       CTDB_NO_MEMORY(ctdb, c->record_data.dptr);
+       c->new_data = NULL;
+       c->reply_data = NULL;
+
+       for (fn=ctdb->calls;fn;fn=fn->next) {
+               if (fn->id == call_id) break;
+       }
+       if (fn == NULL) {
+               ctdb_set_error(ctdb, "Unknown call id %u\n", call_id);
+               return -1;
+       }
+
+       if (fn->fn(c) != 0) {
+               ctdb_set_error(ctdb, "ctdb_call %u failed\n", call_id);
+               return -1;
+       }
+
+       if (header->laccessor != caller) {
+               header->lacount = 0;
+       }
+       header->laccessor = caller;
+       header->lacount++;
+
+       /* we need to force the record to be written out if this was a remote access,
+          so that the lacount is updated */
+       if (c->new_data == NULL && header->laccessor != ctdb->vnn) {
+               c->new_data = &c->record_data;
+       }
+
+       if (c->new_data) {
+               if (ctdb_ltdb_store(ctdb, key, header, *c->new_data) != 0) {
+                       ctdb_set_error(ctdb, "ctdb_call tdb_store failed\n");
+                       return -1;
+               }
+       }
+
+       if (reply_data) {
+               if (c->reply_data) {
+                       *reply_data = *c->reply_data;
+                       talloc_steal(ctdb, reply_data->dptr);
+               } else {
+                       reply_data->dptr = NULL;
+                       reply_data->dsize = 0;
+               }
+       }
+
+       talloc_free(c);
+
+       return 0;
+}
+
+/*
+  send an error reply
+*/
+static void ctdb_send_error(struct ctdb_context *ctdb, 
+                           struct ctdb_req_header *hdr, uint32_t status,
+                           const char *fmt, ...)
+{
+       va_list ap;
+       struct ctdb_reply_error *r;
+       char *msg;
+       int len;
+
+       va_start(ap, fmt);
+       msg = talloc_vasprintf(ctdb, fmt, ap);
+       if (msg == NULL) {
+               ctdb_fatal(ctdb, "Unable to allocate error in ctdb_send_error\n");
+       }
+       va_end(ap);
+
+       len = strlen(msg)+1;
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + len);
+       CTDB_NO_MEMORY_FATAL(ctdb, r);
+       r->hdr.length = sizeof(*r) + len;
+       r->hdr.operation = CTDB_REPLY_ERROR;
+       r->hdr.destnode  = hdr->srcnode;
+       r->hdr.srcnode   = ctdb->vnn;
+       r->hdr.reqid     = hdr->reqid;
+       r->status        = status;
+       r->msglen        = len;
+       memcpy(&r->msg[0], msg, len);
+
+       talloc_free(msg);
+
+       ctdb_queue_packet(ctdb, &r->hdr);
+
+       talloc_free(r);
+}
+
+
+/*
+  send a redirect reply
+*/
+static void ctdb_call_send_redirect(struct ctdb_context *ctdb, 
+                                   struct ctdb_req_call *c, 
+                                   struct ctdb_ltdb_header *header)
+{
+       struct ctdb_reply_redirect *r;
+
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r));
+       CTDB_NO_MEMORY_FATAL(ctdb, r);
+       r->hdr.length = sizeof(*r);
+       r->hdr.operation = CTDB_REPLY_REDIRECT;
+       r->hdr.destnode  = c->hdr.srcnode;
+       r->hdr.srcnode   = ctdb->vnn;
+       r->hdr.reqid     = c->hdr.reqid;
+       r->dmaster       = header->dmaster;
+
+       ctdb_queue_packet(ctdb, &r->hdr);
+
+       talloc_free(r);
+}
+
+/*
+  send a dmaster request (give another node the dmaster for a record)
+
+  This is always sent to the lmaster, which ensures that the lmaster
+  always knows who the dmaster is. The lmaster will then send a
+  CTDB_REPLY_DMASTER to the new dmaster
+*/
+static void ctdb_call_send_dmaster(struct ctdb_context *ctdb, 
+                                  struct ctdb_req_call *c, 
+                                  struct ctdb_ltdb_header *header,
+                                  TDB_DATA *key, TDB_DATA *data)
+{
+       struct ctdb_req_dmaster *r;
+       int len;
+       
+       len = sizeof(*r) + key->dsize + data->dsize;
+       r = ctdb->methods->allocate_pkt(ctdb, len);
+       CTDB_NO_MEMORY_FATAL(ctdb, r);
+       r->hdr.length    = len;
+       r->hdr.operation = CTDB_REQ_DMASTER;
+       r->hdr.destnode  = ctdb_lmaster(ctdb, key);
+       r->hdr.srcnode   = ctdb->vnn;
+       r->hdr.reqid     = c->hdr.reqid;
+       r->dmaster       = header->laccessor;
+       r->keylen        = key->dsize;
+       r->datalen       = data->dsize;
+       memcpy(&r->data[0], key->dptr, key->dsize);
+       memcpy(&r->data[key->dsize], data->dptr, data->dsize);
+
+       if (r->hdr.destnode == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               /* we are the lmaster - don't send to ourselves */
+               DEBUG(0,("XXXX local ctdb_req_dmaster\n"));
+               ctdb_request_dmaster(ctdb, &r->hdr);
+       } else {
+               ctdb_queue_packet(ctdb, &r->hdr);
+
+               /* update the ltdb to record the new dmaster */
+               header->dmaster = r->hdr.destnode;
+               ctdb_ltdb_store(ctdb, *key, header, *data);
+       }
+
+       talloc_free(r);
+}
+
+
+/*
+  called when a CTDB_REQ_DMASTER packet comes in
+
+  this comes into the lmaster for a record when the current dmaster
+  wants to give up the dmaster role and give it to someone else
+*/
+void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_req_dmaster *c = (struct ctdb_req_dmaster *)hdr;
+       struct ctdb_reply_dmaster *r;
+       TDB_DATA key, data;
+       struct ctdb_ltdb_header header;
+       int ret;
+
+       key.dptr = c->data;
+       key.dsize = c->keylen;
+       data.dptr = c->data + c->keylen;
+       data.dsize = c->datalen;
+
+       DEBUG(0,("request dmaster reqid=%d\n", hdr->reqid));
+
+       /* fetch the current record */
+       ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+       if (ret != 0) {
+               ctdb_fatal(ctdb, "ctdb_req_dmaster failed to fetch record");
+               return;
+       }
+
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<data.dsize;i++) {
+                       write(fd, &data.dptr[i], 1);
+               }
+               close(fd);
+       }
+
+       /* its a protocol error if the sending node is not the current dmaster */
+       if (header.dmaster != hdr->srcnode) {
+               ctdb_fatal(ctdb, "dmaster request from non-master");
+               return;
+       }
+
+       DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+       header.dmaster = c->dmaster;
+       if (ctdb_ltdb_store(ctdb, key, &header, data) != 0) {
+               ctdb_fatal(ctdb, "ctdb_req_dmaster unable to update dmaster");
+               return;
+       }
+
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<data.dsize;i++) {
+                       write(fd, &data.dptr[i], 1);
+               }
+               close(fd);
+       }
+
+       /* send the CTDB_REPLY_DMASTER */
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + data.dsize);
+       CTDB_NO_MEMORY_FATAL(ctdb, r);
+       r->hdr.length = sizeof(*r) + data.dsize;
+       r->hdr.operation = CTDB_REPLY_DMASTER;
+       r->hdr.destnode  = c->dmaster;
+       r->hdr.srcnode   = ctdb->vnn;
+       r->hdr.reqid     = hdr->reqid;
+       r->datalen       = data.dsize;
+       memcpy(&r->data[0], data.dptr, data.dsize);
+
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<data.dsize;i++) {
+                       write(fd, &data.dptr[i], 1);
+               }
+               close(fd);
+       }
+
+       DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+       if (0 && r->hdr.destnode == r->hdr.srcnode) {
+               ctdb_reply_dmaster(ctdb, &r->hdr);
+       } else {
+               ctdb_queue_packet(ctdb, &r->hdr);
+               DEBUG(0,("request dmaster reqid=%d %s\n", hdr->reqid, __location__));
+
+               talloc_free(r);
+       }
+}
+
+
+/*
+  called when a CTDB_REQ_CALL packet comes in
+*/
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_req_call *c = (struct ctdb_req_call *)hdr;
+       TDB_DATA key, data, call_data, reply_data;
+       struct ctdb_reply_call *r;
+       int ret;
+       struct ctdb_ltdb_header header;
+
+       key.dptr = c->data;
+       key.dsize = c->keylen;
+       call_data.dptr = c->data + c->keylen;
+       call_data.dsize = c->calldatalen;
+
+       /* determine if we are the dmaster for this key. This also
+          fetches the record data (if any), thus avoiding a 2nd fetch of the data 
+          if the call will be answered locally */
+       ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+       if (ret != 0) {
+               ctdb_send_error(ctdb, hdr, ret, "ltdb fetch failed in ctdb_request_call");
+               return;
+       }
+
+       /* if we are not the dmaster, then send a redirect to the
+          requesting node */
+       if (header.dmaster != ctdb->vnn) {
+               ctdb_call_send_redirect(ctdb, c, &header);
+               talloc_free(data.dptr);
+               return;
+       }
+
+       /* if this nodes has done enough consecutive calls on the same record
+          then give them the record */
+       if (header.laccessor == c->hdr.srcnode &&
+           header.lacount >= CTDB_MAX_LACOUNT) {
+               ctdb_call_send_dmaster(ctdb, c, &header, &key, &data);
+               talloc_free(data.dptr);
+               return;
+       }
+
+       ctdb_call_local(ctdb, key, &header, &data, c->callid, 
+                       call_data.dsize?&call_data:NULL,
+                       &reply_data, c->hdr.srcnode);
+
+       r = ctdb->methods->allocate_pkt(ctdb, sizeof(*r) + reply_data.dsize);
+       CTDB_NO_MEMORY_FATAL(ctdb, r);
+       r->hdr.length = sizeof(*r) + reply_data.dsize;
+       r->hdr.operation = CTDB_REPLY_CALL;
+       r->hdr.destnode  = hdr->srcnode;
+       r->hdr.srcnode   = hdr->destnode;
+       r->hdr.reqid     = hdr->reqid;
+       r->datalen       = reply_data.dsize;
+       memcpy(&r->data[0], reply_data.dptr, reply_data.dsize);
+
+       ctdb_queue_packet(ctdb, &r->hdr);
+
+       talloc_free(reply_data.dptr);
+       talloc_free(r);
+}
+
+enum call_state {CTDB_CALL_WAIT, CTDB_CALL_DONE, CTDB_CALL_ERROR};
+
+/*
+  state of a in-progress ctdb call
+*/
+struct ctdb_call_state {
+       enum call_state state;
+       struct ctdb_req_call *c;
+       struct ctdb_node *node;
+       const char *errmsg;
+       TDB_DATA call_data;
+       TDB_DATA reply_data;
+       TDB_DATA key;
+       int redirect_count;
+       struct ctdb_ltdb_header header;
+};
+
+
+/*
+  called when a CTDB_REPLY_CALL packet comes in
+
+  This packet comes in response to a CTDB_REQ_CALL request packet. It
+  contains any reply data freom the call
+*/
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_call *c = (struct ctdb_reply_call *)hdr;
+       struct ctdb_call_state *state;
+       TDB_DATA reply_data;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       reply_data.dptr = c->data;
+       reply_data.dsize = c->datalen;
+
+       state->reply_data = reply_data;
+
+       talloc_steal(state, c);
+
+       state->state = CTDB_CALL_DONE;
+}
+
+/*
+  called when a CTDB_REPLY_DMASTER packet comes in
+
+  This packet comes in from the lmaster response to a CTDB_REQ_CALL
+  request packet. It means that the current dmaster wants to give us
+  the dmaster role
+*/
+void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_dmaster *c = (struct ctdb_reply_dmaster *)hdr;
+       struct ctdb_call_state *state;
+       TDB_DATA data;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       data.dptr = c->data;
+       data.dsize = c->datalen;
+
+       talloc_steal(state, c);
+
+       /* we're now the dmaster - update our local ltdb with new header
+          and data */
+       state->header.dmaster = ctdb->vnn;
+
+       if (ctdb_ltdb_store(ctdb, state->key, &state->header, data) != 0) {
+               ctdb_fatal(ctdb, "ctdb_reply_dmaster store failed\n");
+               return;
+       }
+
+       ctdb_call_local(ctdb, state->key, &state->header, &data, state->c->callid,
+                       state->call_data.dsize?&state->call_data:NULL,
+                       &state->reply_data, ctdb->vnn);
+
+       state->state = CTDB_CALL_DONE;
+}
+
+
+/*
+  called when a CTDB_REPLY_ERROR packet comes in
+*/
+void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_error *c = (struct ctdb_reply_error *)hdr;
+       struct ctdb_call_state *state;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       talloc_steal(state, c);
+
+       state->state  = CTDB_CALL_ERROR;
+       state->errmsg = (char *)c->msg;
+}
+
+
+/*
+  called when a CTDB_REPLY_REDIRECT packet comes in
+
+  This packet arrives when we have sent a CTDB_REQ_CALL request and
+  the node that received it is not the dmaster for the given key. We
+  are given a hint as to what node to try next.
+*/
+void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr)
+{
+       struct ctdb_reply_redirect *c = (struct ctdb_reply_redirect *)hdr;
+       struct ctdb_call_state *state;
+
+       state = idr_find(ctdb->idr, hdr->reqid);
+
+       talloc_steal(state, c);
+       
+       /* don't allow for too many redirects */
+       if (state->redirect_count++ == CTDB_MAX_REDIRECT) {
+               c->dmaster = ctdb_lmaster(ctdb, &state->key);
+       }
+
+       /* send it off again */
+       state->node = ctdb->nodes[c->dmaster];
+
+       ctdb_queue_packet(ctdb, &state->c->hdr);
+}
+
+/*
+  destroy a ctdb_call
+*/
+static int ctdb_call_destructor(struct ctdb_call_state *state)
+{
+//     idr_remove(state->node->ctdb->idr, state->c->hdr.reqid);
+       return 0;
+}
+
+
+/*
+  called when a ctdb_call times out
+*/
+void ctdb_call_timeout(struct event_context *ev, struct timed_event *te, 
+                      struct timeval t, void *private)
+{
+       struct ctdb_call_state *state = talloc_get_type(private, struct ctdb_call_state);
+       state->state = CTDB_CALL_ERROR;
+       ctdb_set_error(state->node->ctdb, "ctdb_call %u timed out",
+                      state->c->hdr.reqid);
+}
+
+/*
+  construct an event driven local ctdb_call
+
+  this is used so that locally processed ctdb_call requests are processed
+  in an event driven manner
+*/
+struct ctdb_call_state *ctdb_call_local_send(struct ctdb_context *ctdb, 
+                                            TDB_DATA key, int call_id, 
+                                            TDB_DATA *call_data, TDB_DATA *reply_data,
+                                            struct ctdb_ltdb_header *header,
+                                            TDB_DATA *data)
+{
+       struct ctdb_call_state *state;
+       int ret;
+
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY_NULL(ctdb, state);
+
+       state->state = CTDB_CALL_DONE;
+       state->node = ctdb->nodes[ctdb->vnn];
+
+       ret = ctdb_call_local(ctdb, key, header, data, 
+                             call_id, call_data, &state->reply_data, 
+                             ctdb->vnn);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async send
+
+  This constructs a ctdb_call request and queues it for processing. 
+  This call never blocks.
+*/
+struct ctdb_call_state *ctdb_call_send(struct ctdb_context *ctdb, 
+                                      TDB_DATA key, int call_id, 
+                                      TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       uint32_t len;
+       struct ctdb_call_state *state;
+       int ret;
+       struct ctdb_ltdb_header header;
+       TDB_DATA data;
+
+       /*
+         if we are the dmaster for this key then we don't need to
+         send it off at all, we can bypass the network and handle it
+         locally. To find out if we are the dmaster we need to look
+         in our ltdb
+       */
+       ret = ctdb_ltdb_fetch(ctdb, key, &header, &data);
+       if (ret != 0) return NULL;
+
+       if (header.dmaster == ctdb->vnn && !(ctdb->flags & CTDB_FLAG_SELF_CONNECT)) {
+               return ctdb_call_local_send(ctdb, key, call_id, call_data, reply_data,
+                                           &header, &data);
+       }
+
+       state = talloc_zero(ctdb, struct ctdb_call_state);
+       CTDB_NO_MEMORY_NULL(ctdb, state);
+
+       len = sizeof(*state->c) + key.dsize + (call_data?call_data->dsize:0);
+       state->c = ctdb->methods->allocate_pkt(ctdb, len);
+       CTDB_NO_MEMORY_NULL(ctdb, state->c);
+
+       state->c->hdr.length    = len;
+       state->c->hdr.operation = CTDB_REQ_CALL;
+       state->c->hdr.destnode  = header.dmaster;
+       state->c->hdr.srcnode   = ctdb->vnn;
+       /* this limits us to 16k outstanding messages - not unreasonable */
+       state->c->hdr.reqid     = idr_get_new(ctdb->idr, state, 0xFFFF);
+       DEBUG(0,("Allocate reqid %u\n", state->c->hdr.reqid));
+       state->c->callid        = call_id;
+       state->c->keylen        = key.dsize;
+       state->c->calldatalen   = call_data?call_data->dsize:0;
+       memcpy(&state->c->data[0], key.dptr, key.dsize);
+       if (call_data) {
+               memcpy(&state->c->data[key.dsize], call_data->dptr, call_data->dsize);
+               state->call_data.dptr = &state->c->data[key.dsize];
+               state->call_data.dsize = call_data->dsize;
+       }
+       state->key.dptr         = &state->c->data[0];
+       state->key.dsize        = key.dsize;
+
+       state->node   = ctdb->nodes[header.dmaster];
+       state->state  = CTDB_CALL_WAIT;
+       state->header = header;
+
+       talloc_set_destructor(state, ctdb_call_destructor);
+
+       ctdb_queue_packet(ctdb, &state->c->hdr);
+
+       event_add_timed(ctdb->ev, state, timeval_current_ofs(CTDB_REQ_TIMEOUT, 0), 
+                       ctdb_call_timeout, state);
+       return state;
+}
+
+
+/*
+  make a remote ctdb call - async recv. 
+
+  This is called when the program wants to wait for a ctdb_call to complete and get the 
+  results. This call will block unless the call has already completed.
+*/
+int ctdb_call_recv(struct ctdb_call_state *state, TDB_DATA *reply_data)
+{
+       while (state->state < CTDB_CALL_DONE) {
+               event_loop_once(state->node->ctdb->ev);
+       }
+       if (state->state != CTDB_CALL_DONE) {
+               ctdb_set_error(state->node->ctdb, "%s", state->errmsg);
+               talloc_free(state);
+               return -1;
+       }
+       if (reply_data) {
+               reply_data->dptr = talloc_memdup(state->node->ctdb,
+                                                state->reply_data.dptr,
+                                                state->reply_data.dsize);
+               reply_data->dsize = state->reply_data.dsize;
+       }
+       talloc_free(state);
+       return 0;
+}
+
+/*
+  full ctdb_call. Equivalent to a ctdb_call_send() followed by a ctdb_call_recv()
+*/
+int ctdb_call(struct ctdb_context *ctdb, 
+             TDB_DATA key, int call_id, 
+             TDB_DATA *call_data, TDB_DATA *reply_data)
+{
+       struct ctdb_call_state *state;
+       state = ctdb_call_send(ctdb, key, call_id, call_data, reply_data);
+       return ctdb_call_recv(state, reply_data);
+}
diff --git a/source/cluster/ctdb/common/ctdb_ltdb.c b/source/cluster/ctdb/common/ctdb_ltdb.c
new file mode 100644 (file)
index 0000000..bc15a3e
--- /dev/null
@@ -0,0 +1,139 @@
+/* 
+   ctdb ltdb code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+  attach to a specific database
+*/
+int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
+               int open_flags, mode_t mode)
+{
+       /* when we have a separate daemon this will need to be a real
+          file, not a TDB_INTERNAL, so the parent can access it to
+          for ltdb bypass */
+       ctdb->ltdb = tdb_open(name, 0, /* tdb_flags */ TDB_INTERNAL, open_flags, mode);
+       if (ctdb->ltdb == NULL) {
+               ctdb_set_error(ctdb, "Failed to open tdb %s\n", name);
+               return -1;
+       }
+       return 0;
+}
+
+/*
+  return the lmaster given a key
+*/
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key)
+{
+       return ctdb_hash(key) % ctdb->num_nodes;
+}
+
+
+/*
+  construct an initial header for a record with no ltdb header yet
+*/
+static void ltdb_initial_header(struct ctdb_context *ctdb, 
+                               TDB_DATA key,
+                               struct ctdb_ltdb_header *header)
+{
+       header->rsn = 0;
+       /* initial dmaster is the lmaster */
+       header->dmaster = ctdb_lmaster(ctdb, &key);
+       header->laccessor = header->dmaster;
+       header->lacount = 0;
+}
+
+
+/*
+  fetch a record from the ltdb, separating out the header information
+  and returning the body of the record. A valid (initial) header is
+  returned if the record is not present
+*/
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb, 
+                   TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data)
+{
+       TDB_DATA rec;
+
+       rec = tdb_fetch(ctdb->ltdb, key);
+       if (rec.dsize < sizeof(*header)) {
+               /* return an initial header */
+               free(rec.dptr);
+               ltdb_initial_header(ctdb, key, header);
+               data->dptr = NULL;
+               data->dsize = 0;
+               return 0;
+       }
+
+       *header = *(struct ctdb_ltdb_header *)rec.dptr;
+
+       data->dsize = rec.dsize - sizeof(struct ctdb_ltdb_header);
+       data->dptr = talloc_memdup(ctdb, sizeof(struct ctdb_ltdb_header)+rec.dptr,
+                                  data->dsize);
+       free(rec.dptr);
+       CTDB_NO_MEMORY(ctdb, data->dptr);
+
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<data->dsize;i++) {
+                       write(fd, &data->dptr[i], 1);
+               }
+               close(fd);
+       }
+
+       return 0;
+}
+
+
+/*
+  fetch a record from the ltdb, separating out the header information
+  and returning the body of the record. A valid (initial) header is
+  returned if the record is not present
+*/
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key, 
+                   struct ctdb_ltdb_header *header, TDB_DATA data)
+{
+       TDB_DATA rec;
+       int ret;
+
+       rec.dsize = sizeof(*header) + data.dsize;
+       rec.dptr = talloc_size(ctdb, rec.dsize);
+       CTDB_NO_MEMORY(ctdb, rec.dptr);
+
+       memcpy(rec.dptr, header, sizeof(*header));
+       memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<rec.dsize;i++) {
+                       write(fd, &rec.dptr[i], 1);
+               }
+               close(fd);
+       }
+       
+       ret = tdb_store(ctdb->ltdb, key, rec, TDB_REPLACE);
+       talloc_free(rec.dptr);
+
+       return ret;
+}
diff --git a/source/cluster/ctdb/common/ctdb_util.c b/source/cluster/ctdb/common/ctdb_util.c
new file mode 100644 (file)
index 0000000..8e25759
--- /dev/null
@@ -0,0 +1,103 @@
+/* 
+   ctdb utility code
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+
+/*
+  return error string for last error
+*/
+const char *ctdb_errstr(struct ctdb_context *ctdb)
+{
+       return ctdb->err_msg;
+}
+
+
+/*
+  remember an error message
+*/
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...)
+{
+       va_list ap;
+       talloc_free(ctdb->err_msg);
+       va_start(ap, fmt);
+       ctdb->err_msg = talloc_vasprintf(ctdb, fmt, ap);
+       DEBUG(0,("ctdb error: %s\n", ctdb->err_msg));
+       va_end(ap);
+}
+
+
+/*
+  a fatal internal error occurred - no hope for recovery
+*/
+void ctdb_fatal(struct ctdb_context *ctdb, const char *msg)
+{
+       DEBUG(0,("ctdb fatal error: %s\n", msg));
+       fprintf(stderr, "ctdb fatal error: '%s'\n", msg);
+       abort();
+}
+
+/*
+  parse a IP:port pair
+*/
+int ctdb_parse_address(struct ctdb_context *ctdb,
+                      TALLOC_CTX *mem_ctx, const char *str,
+                      struct ctdb_address *address)
+{
+       char *p;
+       p = strchr(str, ':');
+       if (p == NULL) {
+               ctdb_set_error(ctdb, "Badly formed node '%s'\n", str);
+               return -1;
+       }
+       
+       address->address = talloc_strndup(mem_ctx, str, p-str);
+       address->port = strtoul(p+1, NULL, 0);
+       return 0;
+}
+
+
+/*
+  check if two addresses are the same
+*/
+bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2)
+{
+       return strcmp(a1->address, a2->address) == 0 && a1->port == a2->port;
+}
+
+
+/*
+  hash function for mapping data to a VNN - taken from tdb
+*/
+uint32_t ctdb_hash(const TDB_DATA *key)
+{
+       uint32_t value; /* Used to compute the hash value.  */
+       uint32_t i;     /* Used to cycle through random values. */
+
+       /* Set the initial value from the key size. */
+       for (value = 0x238F13AF * key->dsize, i=0; i < key->dsize; i++)
+               value = (value + (key->dptr[i] << (i*5 % 24)));
+
+       return (1103515243 * value + 12345);  
+}
diff --git a/source/cluster/ctdb/config.mk b/source/cluster/ctdb/config.mk
new file mode 100644 (file)
index 0000000..90897ee
--- /dev/null
@@ -0,0 +1,24 @@
+##################
+[MODULE::brlock_ctdb]
+SUBSYSTEM = ntvfs_common
+OBJ_FILES = brlock_ctdb.o
+
+##################
+[MODULE::ctdb_tcp]
+SUBSYSTEM = CLUSTER
+OBJ_FILES = \
+               tcp/tcp_init.o \
+               tcp/tcp_io.o \
+               tcp/tcp_connect.o
+
+##################
+[MODULE::ctdb]
+SUBSYSTEM = CLUSTER
+OBJ_FILES = \
+               ctdb_cluster.o \
+               common/ctdb.o \
+               common/ctdb_call.o \
+               common/ctdb_ltdb.o \
+               common/ctdb_util.o
+PRIVATE_DEPENDENCIES = ctdb_tcp
+PUBLIC_DEPENDENCIES = LIBTDB LIBTALLOC
diff --git a/source/cluster/ctdb/ctdb_cluster.c b/source/cluster/ctdb/ctdb_cluster.c
new file mode 100644 (file)
index 0000000..df16f2f
--- /dev/null
@@ -0,0 +1,138 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   ctdb clustering hooks
+
+   Copyright (C) Andrew Tridgell 2006
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "cluster/cluster.h"
+#include "system/filesys.h"
+#include "cluster/cluster_private.h"
+#include "lib/tdb/include/tdb.h"
+#include "cluster/ctdb/include/ctdb.h"
+
+struct cluster_state {
+       struct ctdb_context *ctdb;
+};
+
+
+/*
+  return a server_id for a ctdb node
+*/
+static struct server_id ctdb_id(struct cluster_ops *ops, uint32_t id)
+{
+       struct ctdb_context *ctdb = ops->private;
+       struct server_id server_id;
+       server_id.node = ctdb_get_vnn(ctdb);
+       server_id.id = id;
+       return server_id;
+}
+
+
+/*
+  return a server_id as a string
+*/
+static const char *ctdb_id_string(struct cluster_ops *ops, 
+                                 TALLOC_CTX *mem_ctx, struct server_id id)
+{
+       return talloc_asprintf(mem_ctx, "%u.%u", id.node, id.id);
+}
+
+static struct cluster_ops cluster_ctdb_ops = {
+       .cluster_id        = ctdb_id,
+       .cluster_id_string = ctdb_id_string,
+       .private           = NULL
+};
+
+/* initialise ctdb */
+void cluster_ctdb_init(struct event_context *ev)
+{
+       const char *nlist;
+       const char *address;
+       const char *transport;
+       struct cluster_state *state;
+       int ret;
+
+       nlist = lp_parm_string(-1, "ctdb", "nlist");
+       if (nlist == NULL) return;
+
+       address = lp_parm_string(-1, "ctdb", "address");
+       if (address == NULL) return;
+
+       transport = lp_parm_string(-1, "ctdb", "transport");
+       if (transport == NULL) {
+               transport = "tcp";
+       }
+
+       state = talloc(ev, struct cluster_state);
+       if (state == NULL) goto failed;
+
+       state->ctdb = ctdb_init(ev);
+       if (state->ctdb == NULL) goto failed;
+
+       cluster_ctdb_ops.private = state->ctdb;
+
+       ret = ctdb_set_transport(state->ctdb, transport);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_set_transport failed - %s\n",
+                        ctdb_errstr(state->ctdb)));
+               goto failed;
+       }
+       
+//     ctdb_set_flags(state->ctdb, CTDB_FLAG_SELF_CONNECT);
+
+       /* tell ctdb what address to listen on */
+        ret = ctdb_set_address(state->ctdb, address);
+        if (ret == -1) {
+                DEBUG(0,("ctdb_set_address failed - %s\n", ctdb_errstr(state->ctdb)));
+               goto failed;
+        }
+
+        /* tell ctdb what nodes are available */
+        ret = ctdb_set_nlist(state->ctdb, nlist);
+        if (ret == -1) {
+                DEBUG(0,("ctdb_set_nlist failed - %s\n", ctdb_errstr(state->ctdb)));
+               goto failed;
+        }
+
+       ret = ctdb_attach(state->ctdb, "cluster.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       if (ret == -1) {
+               DEBUG(0,("ctdb_attach failed - %s\n", ctdb_errstr(state->ctdb)));
+               goto failed;
+       }
+
+       /* start the protocol running */
+       ret = ctdb_start(state->ctdb);
+        if (ret == -1) {
+                DEBUG(0,("ctdb_start failed - %s\n", ctdb_errstr(state->ctdb)));
+               goto failed;
+        }
+
+       /* wait until all nodes are connected (should not be needed
+          outide of test code) */
+       ctdb_connect_wait(state->ctdb);
+
+       cluster_set_ops(&cluster_ctdb_ops);
+       return;
+       
+failed:
+       DEBUG(0,("cluster_ctdb_init failed\n"));
+       talloc_free(state);
+}
diff --git a/source/cluster/ctdb/ctdb_cluster.h b/source/cluster/ctdb/ctdb_cluster.h
new file mode 100644 (file)
index 0000000..5f93df9
--- /dev/null
@@ -0,0 +1,23 @@
+/* 
+   Unix SMB/CIFS implementation.
+
+   ctdb clustering hooks - header
+
+   Copyright (C) Andrew Tridgell 2006
+   
+   This program is free software; you can redistribute it and/or modify
+   it under the terms of the GNU General Public License as published by
+   the Free Software Foundation; either version 2 of the License, or
+   (at your option) any later version.
+   
+   This program is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+   GNU General Public License for more details.
+   
+   You should have received a copy of the GNU General Public License
+   along with this program; if not, write to the Free Software
+   Foundation, Inc., 675 Mass Ave, Cambridge, MA 02139, USA.
+*/
+
+void cluster_ctdb_init(struct event_context *ev);
diff --git a/source/cluster/ctdb/include/ctdb.h b/source/cluster/ctdb/include/ctdb.h
new file mode 100644 (file)
index 0000000..21b9b5d
--- /dev/null
@@ -0,0 +1,117 @@
+/* 
+   ctdb database library
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#ifndef _CTDB_H
+#define _CTDB_H
+
+/*
+  structure passed to a ctdb call function
+*/
+struct ctdb_call {
+       TDB_DATA key;          /* record key */
+       TDB_DATA record_data;  /* current data in the record */
+       TDB_DATA *new_data;    /* optionally updated record data */
+       TDB_DATA *call_data;   /* optionally passed from caller */
+       TDB_DATA *reply_data;  /* optionally returned by function */
+};
+
+#define CTDB_ERR_INVALID 1
+#define CTDB_ERR_NOMEM 2
+
+/*
+  ctdb flags
+*/
+#define CTDB_FLAG_SELF_CONNECT (1<<0)
+
+
+struct event_context;
+
+/*
+  initialise ctdb subsystem
+*/
+struct ctdb_context *ctdb_init(struct event_context *ev);
+
+/*
+  choose the transport
+*/
+int ctdb_set_transport(struct ctdb_context *ctdb, const char *transport);
+
+/*
+  set some flags
+*/
+void ctdb_set_flags(struct ctdb_context *ctdb, unsigned flags);
+
+/*
+  tell ctdb what address to listen on, in transport specific format
+*/
+int ctdb_set_address(struct ctdb_context *ctdb, const char *address);
+
+/*
+  tell ctdb what nodes are available. This takes a filename, which will contain
+  1 node address per line, in a transport specific format
+*/
+int ctdb_set_nlist(struct ctdb_context *ctdb, const char *nlist);
+
+/*
+  start the ctdb protocol
+*/
+int ctdb_start(struct ctdb_context *ctdb);
+
+/*
+  error string for last ctdb error
+*/
+const char *ctdb_errstr(struct ctdb_context *);
+
+/* a ctdb call function */
+typedef int (*ctdb_fn_t)(struct ctdb_call *);
+
+/*
+  setup a ctdb call function
+*/
+int ctdb_set_call(struct ctdb_context *ctdb, ctdb_fn_t fn, int id);
+
+/*
+  attach to a ctdb database
+*/
+int ctdb_attach(struct ctdb_context *ctdb, const char *name, int tdb_flags, 
+               int open_flags, mode_t mode);
+
+
+/*
+  make a ctdb call. The associated ctdb call function will be called on the DMASTER
+  for the given record
+*/
+int ctdb_call(struct ctdb_context *ctdb, TDB_DATA key, int call_id, 
+             TDB_DATA *call_data, TDB_DATA *reply_data);
+
+/*
+  wait for all nodes to be connected - useful for test code
+*/
+void ctdb_connect_wait(struct ctdb_context *ctdb);
+
+/*
+  wait until we're the only node left
+*/
+void ctdb_wait_loop(struct ctdb_context *ctdb);
+
+/* return vnn of this node */
+uint32_t ctdb_get_vnn(struct ctdb_context *ctdb);
+
+#endif
diff --git a/source/cluster/ctdb/include/ctdb_private.h b/source/cluster/ctdb/include/ctdb_private.h
new file mode 100644 (file)
index 0000000..d373e3a
--- /dev/null
@@ -0,0 +1,216 @@
+/* 
+   ctdb database library
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#ifndef _CTDB_PRIVATE_H
+#define _CTDB_PRIVATE_H
+
+#include "ctdb.h"
+
+/*
+  an installed ctdb remote call
+*/
+struct ctdb_registered_call {
+       struct ctdb_registered_call *next, *prev;
+       uint32_t id;
+       ctdb_fn_t fn;
+};
+
+/*
+  this address structure might need to be generalised later for some
+  transports
+*/
+struct ctdb_address {
+       const char *address;
+       int port;
+};
+
+/*
+  state associated with one node
+*/
+struct ctdb_node {
+       struct ctdb_context *ctdb;
+       struct ctdb_address address;
+       const char *name; /* for debug messages */
+       void *private; /* private to transport */
+       uint32_t vnn;
+};
+
+/*
+  transport specific methods
+*/
+struct ctdb_methods {
+       int (*start)(struct ctdb_context *); /* start protocol processing */    
+       int (*add_node)(struct ctdb_node *); /* setup a new node */     
+       int (*queue_pkt)(struct ctdb_node *, uint8_t *data, uint32_t length);
+       void *(*allocate_pkt)(struct ctdb_context *, size_t );
+};
+
+/*
+  transport calls up to the ctdb layer
+*/
+struct ctdb_upcalls {
+       /* recv_pkt is called when a packet comes in */
+       void (*recv_pkt)(struct ctdb_context *, uint8_t *data, uint32_t length);
+
+       /* node_dead is called when an attempt to send to a node fails */
+       void (*node_dead)(struct ctdb_node *);
+
+       /* node_connected is called when a connection to a node is established */
+       void (*node_connected)(struct ctdb_node *);
+};
+
+/* main state of the ctdb daemon */
+struct ctdb_context {
+       struct event_context *ev;
+       struct ctdb_address address;
+       const char *name;
+       uint32_t vnn; /* our own vnn */
+       uint32_t num_nodes;
+       uint32_t num_connected;
+       unsigned flags;
+       struct idr_context *idr;
+       struct ctdb_node **nodes; /* array of nodes in the cluster - indexed by vnn */
+       struct ctdb_registered_call *calls; /* list of registered calls */
+       char *err_msg;
+       struct tdb_context *ltdb;
+       const struct ctdb_methods *methods; /* transport methods */
+       const struct ctdb_upcalls *upcalls; /* transport upcalls */
+       void *private; /* private to transport */
+};
+
+#define CTDB_NO_MEMORY(ctdb, p) do { if (!(p)) { \
+          ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
+         return -1; }} while (0)
+
+#define CTDB_NO_MEMORY_NULL(ctdb, p) do { if (!(p)) { \
+          ctdb_set_error(ctdb, "Out of memory at %s:%d", __FILE__, __LINE__); \
+         return NULL; }} while (0)
+
+#define CTDB_NO_MEMORY_FATAL(ctdb, p) do { if (!(p)) { \
+          ctdb_fatal(ctdb, "Out of memory in " __location__ ); \
+         }} while (0)
+
+/* arbitrary maximum timeout for ctdb operations */
+#define CTDB_REQ_TIMEOUT 10
+
+/* max number of redirects before we ask the lmaster */
+#define CTDB_MAX_REDIRECT 2
+
+/* number of consecutive calls from the same node before we give them
+   the record */
+#define CTDB_MAX_LACOUNT 7000
+
+/*
+  the extended header for records in the ltdb
+*/
+struct ctdb_ltdb_header {
+       uint64_t rsn;
+       uint32_t dmaster;
+       uint32_t laccessor;
+       uint32_t lacount;
+};
+
+
+/*
+  operation IDs
+*/
+enum ctdb_operation {
+       CTDB_REQ_CALL       = 0,
+       CTDB_REPLY_CALL     = 1,
+       CTDB_REPLY_REDIRECT = 2,
+       CTDB_REQ_DMASTER    = 3,
+       CTDB_REPLY_DMASTER  = 4,
+       CTDB_REPLY_ERROR    = 5
+};
+
+/*
+  packet structures
+*/
+struct ctdb_req_header {
+       uint32_t length;
+       uint32_t operation;
+       uint32_t destnode;
+       uint32_t srcnode;
+       uint32_t reqid;
+};
+
+struct ctdb_req_call {
+       struct ctdb_req_header hdr;
+       uint32_t callid;
+       uint32_t keylen;
+       uint32_t calldatalen;
+       uint8_t data[0]; /* key[] followed by calldata[] */
+};
+
+struct ctdb_reply_call {
+       struct ctdb_req_header hdr;
+       uint32_t datalen;
+       uint8_t  data[0];
+};
+
+struct ctdb_reply_error {
+       struct ctdb_req_header hdr;
+       uint32_t status;
+       uint32_t msglen;
+       uint8_t  msg[0];
+};
+
+struct ctdb_reply_redirect {
+       struct ctdb_req_header hdr;
+       uint32_t dmaster;
+};
+
+struct ctdb_req_dmaster {
+       struct ctdb_req_header hdr;
+       uint32_t dmaster;
+       uint32_t keylen;
+       uint32_t datalen;
+       uint8_t  data[0];
+};
+
+struct ctdb_reply_dmaster {
+       struct ctdb_req_header hdr;
+       uint32_t datalen;
+       uint8_t  data[0];
+};
+
+/* internal prototypes */
+void ctdb_set_error(struct ctdb_context *ctdb, const char *fmt, ...);
+void ctdb_fatal(struct ctdb_context *ctdb, const char *msg);
+bool ctdb_same_address(struct ctdb_address *a1, struct ctdb_address *a2);
+int ctdb_parse_address(struct ctdb_context *ctdb,
+                      TALLOC_CTX *mem_ctx, const char *str,
+                      struct ctdb_address *address);
+uint32_t ctdb_hash(const TDB_DATA *key);
+void ctdb_request_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_request_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_dmaster(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_call(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_error(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+void ctdb_reply_redirect(struct ctdb_context *ctdb, struct ctdb_req_header *hdr);
+
+uint32_t ctdb_lmaster(struct ctdb_context *ctdb, const TDB_DATA *key);
+int ctdb_ltdb_fetch(struct ctdb_context *ctdb, 
+                   TDB_DATA key, struct ctdb_ltdb_header *header, TDB_DATA *data);
+int ctdb_ltdb_store(struct ctdb_context *ctdb, TDB_DATA key, 
+                   struct ctdb_ltdb_header *header, TDB_DATA data);
+
+
+#endif
diff --git a/source/cluster/ctdb/tcp/ctdb_tcp.h b/source/cluster/ctdb/tcp/ctdb_tcp.h
new file mode 100644 (file)
index 0000000..0f8ce30
--- /dev/null
@@ -0,0 +1,76 @@
+/* 
+   ctdb database library
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+
+/* ctdb_tcp main state */
+struct ctdb_tcp {
+       int listen_fd;
+};
+
+/*
+  incoming packet structure - only used when we get a partial packet
+  on read
+*/
+struct ctdb_tcp_partial {
+       uint8_t *data;
+       uint32_t length;
+};
+
+
+/*
+  state associated with an incoming connection
+*/
+struct ctdb_incoming {
+       struct ctdb_context *ctdb;
+       int fd;
+       struct ctdb_tcp_partial partial;
+};
+
+/*
+  outgoing packet structure - only allocated when we can't write immediately
+  to the socket
+*/
+struct ctdb_tcp_packet {
+       struct ctdb_tcp_packet *next, *prev;
+       uint8_t *data;
+       uint32_t length;
+};
+
+/*
+  state associated with one tcp node
+*/
+struct ctdb_tcp_node {
+       int fd;
+       struct fd_event *fde;
+       struct ctdb_tcp_packet *queue;
+};
+
+
+/* prototypes internal to tcp transport */
+void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
+                        uint16_t flags, void *private);
+void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
+                           uint16_t flags, void *private);
+int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length);
+int ctdb_tcp_listen(struct ctdb_context *ctdb);
+void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
+                          struct timeval t, void *private);
+
+#define CTDB_TCP_ALIGNMENT 8
diff --git a/source/cluster/ctdb/tcp/tcp_connect.c b/source/cluster/ctdb/tcp/tcp_connect.c
new file mode 100644 (file)
index 0000000..2404144
--- /dev/null
@@ -0,0 +1,191 @@
+/* 
+   ctdb over TCP
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+static void set_nonblocking(int fd)
+{
+       unsigned v;
+       v = fcntl(fd, F_GETFL, 0);
+        fcntl(fd, F_SETFL, v | O_NONBLOCK);
+}
+
+
+/*
+  called when socket becomes writeable on connect
+*/
+static void ctdb_node_connect_write(struct event_context *ev, struct fd_event *fde, 
+                                   uint16_t flags, void *private)
+{
+       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+       struct ctdb_context *ctdb = node->ctdb;
+       int error = 0;
+       socklen_t len = sizeof(error);
+
+       if (getsockopt(tnode->fd, SOL_SOCKET, SO_ERROR, &error, &len) != 0 ||
+           error != 0) {
+               talloc_free(fde);
+               close(tnode->fd);
+               tnode->fd = -1;
+               event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
+                               ctdb_tcp_node_connect, node);
+               return;
+       }
+
+       talloc_free(fde);
+       tnode->fde = event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_READ, 
+                                 ctdb_tcp_node_write, node);
+
+       /* tell the ctdb layer we are connected */
+       node->ctdb->upcalls->node_connected(node);
+
+       if (tnode->queue) {
+               EVENT_FD_WRITEABLE(tnode->fde);         
+       }
+}
+
+/*
+  called when we should try and establish a tcp connection to a node
+*/
+void ctdb_tcp_node_connect(struct event_context *ev, struct timed_event *te, 
+                          struct timeval t, void *private)
+{
+       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+       struct ctdb_context *ctdb = node->ctdb;
+        struct sockaddr_in sock_out;
+
+       tnode->fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+
+       set_nonblocking(tnode->fd);
+
+       inet_pton(AF_INET, node->address.address, &sock_out.sin_addr);
+       sock_out.sin_port = htons(node->address.port);
+       sock_out.sin_family = PF_INET;
+       
+       if (connect(tnode->fd, (struct sockaddr *)&sock_out, sizeof(sock_out)) != 0 &&
+           errno != EINPROGRESS) {
+               /* try again once a second */
+               close(tnode->fd);
+               event_add_timed(ctdb->ev, node, timeval_current_ofs(1, 0), 
+                               ctdb_tcp_node_connect, node);
+               return;
+       }
+
+       /* non-blocking connect - wait for write event */
+       event_add_fd(node->ctdb->ev, node, tnode->fd, EVENT_FD_WRITE|EVENT_FD_READ, 
+                    ctdb_node_connect_write, node);
+}
+
+/*
+  destroy a ctdb_incoming structure 
+*/
+static int ctdb_incoming_destructor(struct ctdb_incoming *in)
+{
+       close(in->fd);
+       in->fd = -1;
+       return 0;
+}
+
+/*
+  called when we get contacted by another node
+  currently makes no attempt to check if the connection is really from a ctdb
+  node in our cluster
+*/
+static void ctdb_listen_event(struct event_context *ev, struct fd_event *fde, 
+                             uint16_t flags, void *private)
+{
+       struct ctdb_context *ctdb;
+       struct ctdb_tcp *ctcp;
+       struct sockaddr_in addr;
+       socklen_t len;
+       int fd;
+       struct ctdb_incoming *in;
+
+       ctdb = talloc_get_type(private, struct ctdb_context);
+       ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
+       memset(&addr, 0, sizeof(addr));
+       len = sizeof(addr);
+       fd = accept(ctcp->listen_fd, (struct sockaddr *)&addr, &len);
+       if (fd == -1) return;
+
+       in = talloc_zero(ctdb, struct ctdb_incoming);
+       in->fd = fd;
+       in->ctdb = ctdb;
+
+       set_nonblocking(in->fd);
+
+       event_add_fd(ctdb->ev, in, in->fd, EVENT_FD_READ, 
+                    ctdb_tcp_incoming_read, in);       
+
+       talloc_set_destructor(in, ctdb_incoming_destructor);
+}
+
+
+/*
+  listen on our own address
+*/
+int ctdb_tcp_listen(struct ctdb_context *ctdb)
+{
+       struct ctdb_tcp *ctcp = talloc_get_type(ctdb->private, struct ctdb_tcp);
+        struct sockaddr_in sock;
+       int one = 1;
+
+        sock.sin_port = htons(ctdb->address.port);
+        sock.sin_family = PF_INET;
+       inet_pton(AF_INET, ctdb->address.address, &sock.sin_addr);
+
+        ctcp->listen_fd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
+        if (ctcp->listen_fd == -1) {
+               ctdb_set_error(ctdb, "socket failed\n");
+               return -1;
+        }
+
+        setsockopt(ctcp->listen_fd,SOL_SOCKET,SO_REUSEADDR,(char *)&one,sizeof(one));
+
+        if (bind(ctcp->listen_fd, (struct sockaddr * )&sock, sizeof(sock)) != 0) {
+               ctdb_set_error(ctdb, "bind failed\n");
+               close(ctcp->listen_fd);
+               ctcp->listen_fd = -1;
+                return -1;
+        }
+
+       if (listen(ctcp->listen_fd, 10) == -1) {
+               ctdb_set_error(ctdb, "listen failed\n");
+               close(ctcp->listen_fd);
+               ctcp->listen_fd = -1;
+               return -1;
+       }
+
+       event_add_fd(ctdb->ev, ctdb, ctcp->listen_fd, EVENT_FD_READ, 
+                    ctdb_listen_event, ctdb);  
+
+       return 0;
+}
+
diff --git a/source/cluster/ctdb/tcp/tcp_init.c b/source/cluster/ctdb/tcp/tcp_init.c
new file mode 100644 (file)
index 0000000..b8ee8cb
--- /dev/null
@@ -0,0 +1,102 @@
+/* 
+   ctdb over TCP
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/tdb/include/tdb.h"
+#include "lib/events/events.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+/*
+  start the protocol going
+*/
+int ctdb_tcp_start(struct ctdb_context *ctdb)
+{
+       int i;
+
+       /* listen on our own address */
+       if (ctdb_tcp_listen(ctdb) != 0) return -1;
+
+       /* startup connections to the other servers - will happen on
+          next event loop */
+       for (i=0;i<ctdb->num_nodes;i++) {
+               struct ctdb_node *node = *(ctdb->nodes + i);
+               if (!(ctdb->flags & CTDB_FLAG_SELF_CONNECT) &&
+                   ctdb_same_address(&ctdb->address, &node->address)) continue;
+               event_add_timed(ctdb->ev, node, timeval_zero(), 
+                               ctdb_tcp_node_connect, node);
+       }
+
+       return 0;
+}
+
+
+/*
+  initialise tcp portion of a ctdb node 
+*/
+int ctdb_tcp_add_node(struct ctdb_node *node)
+{
+       struct ctdb_tcp_node *tnode;
+       tnode = talloc_zero(node, struct ctdb_tcp_node);
+       CTDB_NO_MEMORY(node->ctdb, tnode);
+
+       tnode->fd = -1;
+       node->private = tnode;
+       return 0;
+}
+
+
+/*
+  transport packet allocator - allows transport to control memory for packets
+*/
+void *ctdb_tcp_allocate_pkt(struct ctdb_context *ctdb, size_t size)
+{
+       /* tcp transport needs to round to 8 byte alignment to ensure
+          that we can use a length header and 64 bit elements in
+          structures */
+       size = (size+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
+       return talloc_size(ctdb, size);
+}
+
+
+static const struct ctdb_methods ctdb_tcp_methods = {
+       .start     = ctdb_tcp_start,
+       .add_node  = ctdb_tcp_add_node,
+       .queue_pkt = ctdb_tcp_queue_pkt,
+       .allocate_pkt = ctdb_tcp_allocate_pkt
+};
+
+/*
+  initialise tcp portion of ctdb 
+*/
+int ctdb_tcp_init(struct ctdb_context *ctdb)
+{
+       struct ctdb_tcp *ctcp;
+       ctcp = talloc_zero(ctdb, struct ctdb_tcp);
+       CTDB_NO_MEMORY(ctdb, ctcp);
+
+       ctcp->listen_fd = -1;
+       ctdb->private = ctcp;
+       ctdb->methods = &ctdb_tcp_methods;
+       return 0;
+}
+
diff --git a/source/cluster/ctdb/tcp/tcp_io.c b/source/cluster/ctdb/tcp/tcp_io.c
new file mode 100644 (file)
index 0000000..82e24f7
--- /dev/null
@@ -0,0 +1,254 @@
+/* 
+   ctdb over TCP
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "lib/util/dlinklist.h"
+#include "lib/tdb/include/tdb.h"
+#include "system/network.h"
+#include "system/filesys.h"
+#include "cluster/ctdb/include/ctdb_private.h"
+#include "ctdb_tcp.h"
+
+
+/*
+  called when we fail to send a message to a node
+*/
+static void ctdb_tcp_node_dead(struct event_context *ev, struct timed_event *te, 
+                              struct timeval t, void *private)
+{
+       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+
+       /* flush the queue */
+       while (tnode->queue) {
+               struct ctdb_tcp_packet *pkt = tnode->queue;
+               DLIST_REMOVE(tnode->queue, pkt);
+               talloc_free(pkt);
+       }
+
+       /* start a new connect cycle to try to re-establish the
+          link */
+       talloc_free(tnode->fde);
+       close(tnode->fd);
+       tnode->fd = -1;
+       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
+                       ctdb_tcp_node_connect, node);
+}
+
+/*
+  called when socket becomes readable
+*/
+void ctdb_tcp_node_write(struct event_context *ev, struct fd_event *fde, 
+                        uint16_t flags, void *private)
+{
+       struct ctdb_node *node = talloc_get_type(private, struct ctdb_node);
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+       if (flags & EVENT_FD_READ) {
+               /* getting a read event on this fd in the current tcp model is
+                  always an error, as we have separate read and write
+                  sockets. In future we may combine them, but for now it must
+                  mean that the socket is dead, so we try to reconnect */
+               talloc_free(tnode->fde);
+               close(tnode->fd);
+               tnode->fd = -1;
+               event_add_timed(node->ctdb->ev, node, timeval_zero(), 
+                               ctdb_tcp_node_connect, node);
+               return;
+       }
+
+       while (tnode->queue) {
+               struct ctdb_tcp_packet *pkt = tnode->queue;
+               ssize_t n;
+
+               n = write(tnode->fd, pkt->data, pkt->length);
+
+               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
+                                       ctdb_tcp_node_dead, node);
+                       EVENT_FD_NOT_WRITEABLE(tnode->fde);
+                       return;
+               }
+               if (n <= 0) return;
+               
+               if (n != pkt->length) {
+                       pkt->length -= n;
+                       pkt->data += n;
+                       return;
+               }
+
+               DLIST_REMOVE(tnode->queue, pkt);
+               talloc_free(pkt);
+       }
+
+       EVENT_FD_NOT_WRITEABLE(tnode->fde);
+}
+
+
+/*
+  called when an incoming connection is readable
+*/
+void ctdb_tcp_incoming_read(struct event_context *ev, struct fd_event *fde, 
+                           uint16_t flags, void *private)
+{
+       struct ctdb_incoming *in = talloc_get_type(private, struct ctdb_incoming);
+       int num_ready = 0;
+       ssize_t nread;
+       uint8_t *data, *data_base;
+
+       if (ioctl(in->fd, FIONREAD, &num_ready) != 0 ||
+           num_ready == 0) {
+               /* we've lost the link from another node. We don't
+                  notify the upper layers, as we only want to trigger
+                  a full node reorganisation when a send fails - that
+                  allows nodes to restart without penalty as long as
+                  the network is idle */
+               talloc_free(in);
+               return;
+       }
+
+       in->partial.data = talloc_realloc_size(in, in->partial.data, 
+                                              num_ready + in->partial.length);
+       if (in->partial.data == NULL) {
+               /* not much we can do except drop the socket */
+               talloc_free(in);
+               return;
+       }
+
+       nread = read(in->fd, in->partial.data+in->partial.length, num_ready);
+       if (nread <= 0) {
+               /* the connection must be dead */
+               talloc_free(in);
+               return;
+       }
+
+       data = in->partial.data;
+       nread += in->partial.length;
+
+       in->partial.data = NULL;
+       in->partial.length = 0;
+
+       if (nread >= 4 && *(uint32_t *)data == nread) {
+               /* most common case - we got a whole packet in one go
+                  tell the ctdb layer above that we have a packet */
+               in->ctdb->upcalls->recv_pkt(in->ctdb, data, nread);
+               return;
+       }
+
+       data_base = data;
+
+       while (nread >= 4 && *(uint32_t *)data <= nread) {
+               /* we have at least one packet */
+               uint8_t *d2;
+               uint32_t len;
+               len = *(uint32_t *)data;
+               d2 = talloc_memdup(in, data, len);
+               if (d2 == NULL) {
+                       /* sigh */
+                       talloc_free(in);
+                       return;
+               }
+               in->ctdb->upcalls->recv_pkt(in->ctdb, d2, len);
+               data += len;
+               nread -= len;           
+               return;
+       }
+
+       if (nread < 4 || *(uint32_t *)data > nread) {
+               /* we have only part of a packet */
+               if (data_base == data) {
+                       in->partial.data = data;
+                       in->partial.length = nread;
+               } else {
+                       in->partial.data = talloc_memdup(in, data, nread);
+                       if (in->partial.data == NULL) {
+                               talloc_free(in);
+                               return;
+                       }
+                       in->partial.length = nread;
+                       talloc_free(data_base);
+               }
+               return;
+       }
+
+       talloc_free(data_base);
+}
+
+/*
+  queue a packet for sending
+*/
+int ctdb_tcp_queue_pkt(struct ctdb_node *node, uint8_t *data, uint32_t length)
+{
+       struct ctdb_tcp_node *tnode = talloc_get_type(node->private, 
+                                                     struct ctdb_tcp_node);
+       struct ctdb_tcp_packet *pkt;
+       uint32_t length2;
+
+       /* enforce the length and alignment rules from the tcp packet allocator */
+       length2 = (length+(CTDB_TCP_ALIGNMENT-1)) & ~(CTDB_TCP_ALIGNMENT-1);
+       *(uint32_t *)data = length2;
+
+       if (length2 != length) {
+               memset(data+length, 0, length2-length);
+       }
+       {
+               int i, fd = open("/dev/null", O_WRONLY);
+               for (i=0;i<length2;i++) {
+                       write(fd, &data[i], 1);
+               }
+               close(fd);
+       }
+       
+       /* if the queue is empty then try an immediate write, avoiding
+          queue overhead. This relies on non-blocking sockets */
+       if (tnode->queue == NULL && tnode->fd != -1) {
+               ssize_t n = write(tnode->fd, data, length2);
+               if (n == -1 && errno != EAGAIN && errno != EWOULDBLOCK) {
+                       event_add_timed(node->ctdb->ev, node, timeval_zero(), 
+                                       ctdb_tcp_node_dead, node);
+                       /* yes, we report success, as the dead node is 
+                          handled via a separate event */
+                       return 0;
+               }
+               if (n > 0) {
+                       data += n;
+                       length2 -= n;
+               }
+               if (length2 == 0) return 0;
+       }
+
+       pkt = talloc(tnode, struct ctdb_tcp_packet);
+       CTDB_NO_MEMORY(node->ctdb, pkt);
+
+       pkt->data = talloc_memdup(pkt, data, length2);
+       CTDB_NO_MEMORY(node->ctdb, pkt->data);
+
+       pkt->length = length2;
+
+       if (tnode->queue == NULL && tnode->fd != -1) {
+               EVENT_FD_WRITEABLE(tnode->fde);
+       }
+
+       DLIST_ADD_END(tnode->queue, pkt, struct ctdb_tcp_packet *);
+
+       return 0;
+}
diff --git a/source/cluster/ctdb/tests/ctdb_bench.c b/source/cluster/ctdb/tests/ctdb_bench.c
new file mode 100644 (file)
index 0000000..01a8cc0
--- /dev/null
@@ -0,0 +1,228 @@
+/* 
+   simple ctdb benchmark
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+
+#include <sys/time.h>
+#include <time.h>
+
+static struct timeval tp1,tp2;
+
+static void start_timer(void)
+{
+       gettimeofday(&tp1,NULL);
+}
+
+static double end_timer(void)
+{
+       gettimeofday(&tp2,NULL);
+       return (tp2.tv_sec + (tp2.tv_usec*1.0e-6)) - 
+               (tp1.tv_sec + (tp1.tv_usec*1.0e-6));
+}
+
+
+static int timelimit = 10;
+static int num_records = 10;
+static int num_repeats = 100;
+
+enum my_functions {FUNC_INCR=1, FUNC_FETCH=2};
+
+/*
+  ctdb call function to increment an integer
+*/
+static int incr_func(struct ctdb_call *call)
+{
+       if (call->record_data.dsize == 0) {
+               call->new_data = talloc(call, TDB_DATA);
+               if (call->new_data == NULL) {
+                       return CTDB_ERR_NOMEM;
+               }
+               call->new_data->dptr = talloc_size(call, 4);
+               call->new_data->dsize = 4;
+               *(uint32_t *)call->new_data->dptr = 0;
+       } else {
+               call->new_data = &call->record_data;
+       }
+       (*(uint32_t *)call->new_data->dptr)++;
+       return 0;
+}
+
+/*
+  ctdb call function to fetch a record
+*/
+static int fetch_func(struct ctdb_call *call)
+{
+       call->reply_data = &call->record_data;
+       return 0;
+}
+
+/*
+  benchmark incrementing an integer
+*/
+static void bench_incr(struct ctdb_context *ctdb)
+{
+       TDB_DATA key, data;
+       int loops=0;
+       int ret, i;
+
+       start_timer();
+
+       while (1) {
+               uint32_t v = loops % num_records;
+               key.dptr = &v;
+               key.dsize = 4;
+               for (i=0;i<num_repeats;i++) {
+                       ret = ctdb_call(ctdb, key, FUNC_INCR, NULL, NULL);
+                       if (ret != 0) {
+                               printf("incr call failed - %s\n", ctdb_errstr(ctdb));
+                               return;
+                       }
+               }
+               if (num_repeats * (++loops) % 10000 == 0) {
+                       if (end_timer() > timelimit) break;
+                       printf("Incr: %.2f ops/sec\r", num_repeats*loops/end_timer());
+                       fflush(stdout);
+               }
+       }
+
+       ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
+       if (ret == -1) {
+               printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
+               return;
+       }
+
+       printf("Incr: %.2f ops/sec (loops=%d val=%d)\n", 
+              num_repeats*loops/end_timer(), loops, *(uint32_t *)data.dptr);
+}
+
+/*
+  main program
+*/
+int main(int argc, const char *argv[])
+{
+       struct ctdb_context *ctdb;
+       const char *nlist = NULL;
+       const char *transport = "tcp";
+       const char *myaddress = NULL;
+       int self_connect=0;
+
+       struct poptOption popt_options[] = {
+               POPT_AUTOHELP
+               { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
+               { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
+               { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+               { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+               { "timelimit", 't', POPT_ARG_INT, &timelimit, 0, "timelimit", "integer" },
+               { "num-records", 'r', POPT_ARG_INT, &num_records, 0, "num_records", "integer" },
+               POPT_TABLEEND
+       };
+       int opt;
+       const char **extra_argv;
+       int extra_argc = 0;
+       int ret;
+       poptContext pc;
+       struct event_context *ev;
+
+       pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+       while ((opt = poptGetNextOpt(pc)) != -1) {
+               switch (opt) {
+               default:
+                       fprintf(stderr, "Invalid option %s: %s\n", 
+                               poptBadOption(pc, 0), poptStrerror(opt));
+                       exit(1);
+               }
+       }
+
+       /* setup the remaining options for the main program to use */
+       extra_argv = poptGetArgs(pc);
+       if (extra_argv) {
+               extra_argv++;
+               while (extra_argv[extra_argc]) extra_argc++;
+       }
+
+       if (nlist == NULL || myaddress == NULL) {
+               printf("You must provide a node list with --nlist and an address with --listen\n");
+               exit(1);
+       }
+
+       ev = event_context_init(NULL);
+
+       /* initialise ctdb */
+       ctdb = ctdb_init(ev);
+       if (ctdb == NULL) {
+               printf("Failed to init ctdb\n");
+               exit(1);
+       }
+
+       if (self_connect) {
+               ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+       }
+
+       ret = ctdb_set_transport(ctdb, transport);
+       if (ret == -1) {
+               printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* tell ctdb what address to listen on */
+       ret = ctdb_set_address(ctdb, myaddress);
+       if (ret == -1) {
+               printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* tell ctdb what nodes are available */
+       ret = ctdb_set_nlist(ctdb, nlist);
+       if (ret == -1) {
+               printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* setup a ctdb call function */
+       ret = ctdb_set_call(ctdb, incr_func,  FUNC_INCR);
+       ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH);
+
+       /* attach to a specific database */
+       ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       if (ret == -1) {
+               printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
+       /* wait until all nodes are connected (should not be needed
+          outside of test code) */
+       ctdb_connect_wait(ctdb);
+
+       bench_incr(ctdb);
+       
+       /* go into a wait loop to allow other nodes to complete */
+       ctdb_wait_loop(ctdb);
+
+       /* shut it down */
+       talloc_free(ctdb);
+       return 0;
+}
diff --git a/source/cluster/ctdb/tests/ctdb_test.c b/source/cluster/ctdb/tests/ctdb_test.c
new file mode 100644 (file)
index 0000000..5bc35ad
--- /dev/null
@@ -0,0 +1,207 @@
+/* 
+   ctdb test harness
+
+   Copyright (C) Andrew Tridgell  2006
+
+   This library is free software; you can redistribute it and/or
+   modify it under the terms of the GNU Lesser General Public
+   License as published by the Free Software Foundation; either
+   version 2 of the License, or (at your option) any later version.
+
+   This library is distributed in the hope that it will be useful,
+   but WITHOUT ANY WARRANTY; without even the implied warranty of
+   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+   Lesser General Public License for more details.
+
+   You should have received a copy of the GNU Lesser General Public
+   License along with this library; if not, write to the Free Software
+   Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
+*/
+
+#include "includes.h"
+#include "lib/events/events.h"
+#include "system/filesys.h"
+#include "popt.h"
+
+enum my_functions {FUNC_SORT=1, FUNC_FETCH=2};
+
+static int int_compare(int *i1, int *i2)
+{
+       return *i1 - *i2;
+}
+
+/*
+  add an integer into a record in sorted order
+*/
+static int sort_func(struct ctdb_call *call)
+{
+       if (call->call_data == NULL ||
+           call->call_data->dsize != sizeof(int)) {
+               return CTDB_ERR_INVALID;
+       }
+       call->new_data = talloc(call, TDB_DATA);
+       if (call->new_data == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       call->new_data->dptr = talloc_size(call, 
+                                          call->record_data.dsize + 
+                                          call->call_data->dsize);
+       if (call->new_data->dptr == NULL) {
+               return CTDB_ERR_NOMEM;
+       }
+       call->new_data->dsize = call->record_data.dsize + call->call_data->dsize;
+       memcpy(call->new_data->dptr,
+              call->record_data.dptr, call->record_data.dsize);
+       memcpy(call->new_data->dptr+call->record_data.dsize,
+              call->call_data->dptr, call->call_data->dsize);
+
+       qsort(call->new_data->dptr, call->new_data->dsize / sizeof(int),
+             sizeof(int), (comparison_fn_t)int_compare);
+
+       return 0;
+}
+
+/*
+  ctdb call function to fetch a record
+*/
+static int fetch_func(struct ctdb_call *call)
+{
+       call->reply_data = &call->record_data;
+       return 0;
+}
+
+/*
+  main program
+*/
+int main(int argc, const char *argv[])
+{
+       struct ctdb_context *ctdb;
+       const char *nlist = NULL;
+       const char *transport = "tcp";
+       const char *myaddress = NULL;
+       int self_connect=0;
+
+       struct poptOption popt_options[] = {
+               POPT_AUTOHELP
+               { "nlist", 0, POPT_ARG_STRING, &nlist, 0, "node list file", "filename" },
+               { "listen", 0, POPT_ARG_STRING, &myaddress, 0, "address to listen on", "address" },
+               { "transport", 0, POPT_ARG_STRING, &transport, 0, "protocol transport", NULL },
+               { "self-connect", 0, POPT_ARG_NONE, &self_connect, 0, "enable self connect", "boolean" },
+               POPT_TABLEEND
+       };
+       int opt;
+       const char **extra_argv;
+       int extra_argc = 0;
+       int i, ret;
+       TDB_DATA key, data;
+       poptContext pc;
+       struct event_context *ev;
+
+       pc = poptGetContext(argv[0], argc, argv, popt_options, POPT_CONTEXT_KEEP_FIRST);
+
+       while ((opt = poptGetNextOpt(pc)) != -1) {
+               switch (opt) {
+               default:
+                       fprintf(stderr, "Invalid option %s: %s\n", 
+                               poptBadOption(pc, 0), poptStrerror(opt));
+                       exit(1);
+               }
+       }
+
+       /* setup the remaining options for the main program to use */
+       extra_argv = poptGetArgs(pc);
+       if (extra_argv) {
+               extra_argv++;
+               while (extra_argv[extra_argc]) extra_argc++;
+       }
+
+       if (nlist == NULL || myaddress == NULL) {
+               printf("You must provide a node list with --nlist and an address with --listen\n");
+               exit(1);
+       }
+
+       ev = event_context_init(NULL);
+
+       /* initialise ctdb */
+       ctdb = ctdb_init(ev);
+       if (ctdb == NULL) {
+               printf("Failed to init ctdb\n");
+               exit(1);
+       }
+
+       if (self_connect) {
+               ctdb_set_flags(ctdb, CTDB_FLAG_SELF_CONNECT);
+       }
+
+       ret = ctdb_set_transport(ctdb, transport);
+       if (ret == -1) {
+               printf("ctdb_set_transport failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* tell ctdb what address to listen on */
+       ret = ctdb_set_address(ctdb, myaddress);
+       if (ret == -1) {
+               printf("ctdb_set_address failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* tell ctdb what nodes are available */
+       ret = ctdb_set_nlist(ctdb, nlist);
+       if (ret == -1) {
+               printf("ctdb_set_nlist failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* setup a ctdb call function */
+       ret = ctdb_set_call(ctdb, sort_func,  FUNC_SORT);
+       ret = ctdb_set_call(ctdb, fetch_func, FUNC_FETCH);
+
+       /* attach to a specific database */
+       ret = ctdb_attach(ctdb, "test.tdb", TDB_DEFAULT, O_RDWR|O_CREAT|O_TRUNC, 0666);
+       if (ret == -1) {
+               printf("ctdb_attach failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       /* start the protocol running */
+       ret = ctdb_start(ctdb);
+
+       /* wait until all nodes are connected (should not be needed
+          outide of test code) */
+       ctdb_connect_wait(ctdb);
+       
+       key.dptr = "test";
+       key.dsize = strlen("test")+1;
+
+       /* add some random data */
+       for (i=0;i<10;i++) {
+               int v = random() % 1000;
+               data.dptr = (uint8_t *)&v;
+               data.dsize = sizeof(v);
+               ret = ctdb_call(ctdb, key, FUNC_SORT, &data, NULL);
+               if (ret == -1) {
+                       printf("ctdb_call FUNC_SORT failed - %s\n", ctdb_errstr(ctdb));
+                       exit(1);
+               }
+       }
+
+       /* fetch the record */
+       ret = ctdb_call(ctdb, key, FUNC_FETCH, NULL, &data);
+       if (ret == -1) {
+               printf("ctdb_call FUNC_FETCH failed - %s\n", ctdb_errstr(ctdb));
+               exit(1);
+       }
+
+       for (i=0;i<data.dsize/sizeof(int);i++) {
+               printf("%3d\n", ((int *)data.dptr)[i]);
+       }
+       talloc_free(data.dptr);
+
+       /* go into a wait loop to allow other nodes to complete */
+       ctdb_wait_loop(ctdb);
+
+       /* shut it down */
+       talloc_free(ctdb);
+       return 0;
+}