along with this program; if not, see <http://www.gnu.org/licenses/>.
*/
-#include "includes.h"
-#include "lib/tevent/tevent.h"
-#include "lib/tdb/include/tdb.h"
+#include "replace.h"
#include "system/network.h"
#include "system/filesys.h"
#include "system/dir.h"
#include "system/time.h"
-#include "../include/ctdb_private.h"
-#include "../common/rb_tree.h"
-#include "db_wrap.h"
-#include "lib/util/dlinklist.h"
-#include <ctype.h>
+#include "system/locale.h"
-#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
+#include <talloc.h>
+#include <tevent.h>
-/*
- this is the dummy null procedure that all databases support
-*/
-static int ctdb_null_func(struct ctdb_call_info *call)
-{
- return 0;
-}
+#include "lib/tdb_wrap/tdb_wrap.h"
+#include "lib/util/dlinklist.h"
+#include "lib/util/debug.h"
+#include "lib/util/samba_util.h"
-/*
- this is a plain fetch procedure that all databases support
-*/
-static int ctdb_fetch_func(struct ctdb_call_info *call)
-{
- call->reply_data = &call->record_data;
- return 0;
-}
+#include "ctdb_private.h"
+#include "ctdb_client.h"
+
+#include "common/rb_tree.h"
+#include "common/reqid.h"
+#include "common/system.h"
+#include "common/common.h"
+#include "common/logging.h"
+#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
/**
* write a record to a normal database
TDB_DATA data)
{
struct ctdb_context *ctdb = ctdb_db->ctdb;
- TDB_DATA rec;
+ TDB_DATA rec[2];
+ uint32_t hsize = sizeof(struct ctdb_ltdb_header);
int ret;
bool seqnum_suppressed = false;
bool keep = false;
bool schedule_for_deletion = false;
+ bool remove_from_delete_queue = false;
uint32_t lmaster;
if (ctdb->flags & CTDB_FLAG_TORTURE) {
+ TDB_DATA old;
struct ctdb_ltdb_header *h2;
- rec = tdb_fetch(ctdb_db->ltdb->tdb, key);
- h2 = (struct ctdb_ltdb_header *)rec.dptr;
- if (rec.dptr && rec.dsize >= sizeof(h2) && h2->rsn > header->rsn) {
- DEBUG(DEBUG_CRIT,("RSN regression! %llu %llu\n",
- (unsigned long long)h2->rsn, (unsigned long long)header->rsn));
+
+ old = tdb_fetch(ctdb_db->ltdb->tdb, key);
+ h2 = (struct ctdb_ltdb_header *)old.dptr;
+ if (old.dptr != NULL &&
+ old.dsize >= hsize &&
+ h2->rsn > header->rsn) {
+ DEBUG(DEBUG_ERR,
+ ("RSN regression! %"PRIu64" %"PRIu64"\n",
+ h2->rsn, header->rsn));
+ }
+ if (old.dptr) {
+ free(old.dptr);
}
- if (rec.dptr) free(rec.dptr);
}
if (ctdb->vnn_map == NULL) {
*/
if (data.dsize != 0) {
keep = true;
+ } else if (header->flags & CTDB_REC_RO_FLAGS) {
+ keep = true;
} else if (ctdb_db->persistent) {
keep = true;
} else if (header->flags & CTDB_REC_FLAG_AUTOMATIC) {
keep = true;
}
- if (keep &&
- (data.dsize == 0) &&
- !ctdb_db->persistent &&
- (ctdb_db->ctdb->pnn == header->dmaster))
- {
- schedule_for_deletion = true;
+ if (keep) {
+ if (!ctdb_db->persistent &&
+ (ctdb_db->ctdb->pnn == header->dmaster) &&
+ !(header->flags & CTDB_REC_RO_FLAGS))
+ {
+ header->rsn++;
+
+ if (data.dsize == 0) {
+ schedule_for_deletion = true;
+ }
+ }
+ remove_from_delete_queue = !schedule_for_deletion;
}
store:
*/
header->flags &= ~CTDB_REC_FLAG_AUTOMATIC;
- rec.dsize = sizeof(*header) + data.dsize;
- rec.dptr = talloc_size(ctdb, rec.dsize);
- CTDB_NO_MEMORY(ctdb, rec.dptr);
+ rec[0].dsize = hsize;
+ rec[0].dptr = (uint8_t *)header;
- memcpy(rec.dptr, header, sizeof(*header));
- memcpy(rec.dptr + sizeof(*header), data.dptr, data.dsize);
+ rec[1].dsize = data.dsize;
+ rec[1].dptr = data.dptr;
/* Databases with seqnum updates enabled only get their seqnum
changes when/if we modify the data */
TDB_DATA old;
old = tdb_fetch(ctdb_db->ltdb->tdb, key);
- if ( (old.dsize == rec.dsize)
- && !memcmp(old.dptr+sizeof(struct ctdb_ltdb_header),
- rec.dptr+sizeof(struct ctdb_ltdb_header),
- rec.dsize-sizeof(struct ctdb_ltdb_header)) ) {
+ if ((old.dsize == hsize + data.dsize) &&
+ memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
seqnum_suppressed = true;
}
- if (old.dptr) free(old.dptr);
+ if (old.dptr != NULL) {
+ free(old.dptr);
+ }
}
DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
ctdb_hash(&key)));
if (keep) {
- ret = tdb_store(ctdb_db->ltdb->tdb, key, rec, TDB_REPLACE);
+ ret = tdb_storev(ctdb_db->ltdb->tdb, key, rec, 2, TDB_REPLACE);
} else {
ret = tdb_delete(ctdb_db->ltdb->tdb, key);
}
tdb_errorstr(ctdb_db->ltdb->tdb)));
schedule_for_deletion = false;
+ remove_from_delete_queue = false;
}
if (seqnum_suppressed) {
tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
}
- talloc_free(rec.dptr);
-
if (schedule_for_deletion) {
int ret2;
ret2 = ctdb_local_schedule_for_deletion(ctdb_db, header, key);
- if (ret != 0) {
+ if (ret2 != 0) {
DEBUG(DEBUG_ERR, (__location__ " ctdb_local_schedule_for_deletion failed.\n"));
}
}
+ if (remove_from_delete_queue) {
+ ctdb_local_remove_from_delete_queue(ctdb_db, header, key);
+ }
+
return ret;
}
struct lock_fetch_state {
struct ctdb_context *ctdb;
+ struct ctdb_db_context *ctdb_db;
void (*recv_pkt)(void *, struct ctdb_req_header *);
void *recv_context;
struct ctdb_req_header *hdr;
/*
called when we should retry the operation
*/
-static void lock_fetch_callback(void *p)
+static void lock_fetch_callback(void *p, bool locked)
{
struct lock_fetch_state *state = talloc_get_type(p, struct lock_fetch_state);
if (!state->ignore_generation &&
- state->generation != state->ctdb->vnn_map->generation) {
+ state->generation != state->ctdb_db->generation) {
DEBUG(DEBUG_NOTICE,("Discarding previous generation lockwait packet\n"));
talloc_free(state->hdr);
return;
1) tries to get the chainlock. If it succeeds, then it returns 0
2) if it fails to get a chainlock immediately then it sets up a
- non-blocking chainlock via ctdb_lockwait, and when it gets the
+ non-blocking chainlock via ctdb_lock_record, and when it gets the
chainlock it re-submits this ctdb request to the main packet
- receive function
+ receive function.
This effectively queues all ctdb requests that cannot be
immediately satisfied until it can get the lock. This means that
{
int ret;
struct tdb_context *tdb = ctdb_db->ltdb->tdb;
- struct lockwait_handle *h;
+ struct lock_request *lreq;
struct lock_fetch_state *state;
ret = tdb_chainlock_nonblock(tdb, key);
state = talloc(hdr, struct lock_fetch_state);
state->ctdb = ctdb_db->ctdb;
+ state->ctdb_db = ctdb_db;
state->hdr = hdr;
state->recv_pkt = recv_pkt;
state->recv_context = recv_context;
- state->generation = ctdb_db->ctdb->vnn_map->generation;
+ state->generation = ctdb_db->generation;
state->ignore_generation = ignore_generation;
/* now the contended path */
- h = ctdb_lockwait(ctdb_db, key, lock_fetch_callback, state);
- if (h == NULL) {
+ lreq = ctdb_lock_record(state, ctdb_db, key, true, lock_fetch_callback, state);
+ if (lreq == NULL) {
return -1;
}
/* we need to move the packet off the temporary context in ctdb_input_pkt(),
so it won't be freed yet */
talloc_steal(state, hdr);
- talloc_steal(state, h);
/* now tell the caller than we will retry asynchronously */
return -2;
ctdb_db->db_path,
ctdb_db->unhealthy_reason));
}
- DEBUG((fail!=0)?DEBUG_ALERT:DEBUG_NOTICE,
- ("ctdb_recheck_presistent_health: OK[%d] FAIL[%d]\n",
+ DEBUG(DEBUG_NOTICE,
+ ("ctdb_recheck_persistent_health: OK[%d] FAIL[%d]\n",
ok, fail));
if (fail != 0) {
return -1;
}
- if (may_recover && !ctdb->done_startup) {
+ if (may_recover && ctdb->runstate == CTDB_RUNSTATE_STARTUP) {
DEBUG(DEBUG_ERR, (__location__ " db %s become healthy - force recovery for startup\n",
ctdb_db->db_name));
ctdb->recovery_mode = CTDB_RECOVERY_ACTIVE;
return 0;
}
+
+int ctdb_set_db_readonly(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+ char *ropath;
+
+ if (ctdb_db->readonly) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
+ return -1;
+ }
+
+ ropath = talloc_asprintf(ctdb_db, "%s.RO", ctdb_db->db_path);
+ if (ropath == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to asprintf the tracking database\n"));
+ return -1;
+ }
+ ctdb_db->rottdb = tdb_open(ropath,
+ ctdb->tunable.database_hash_size,
+ TDB_NOLOCK|TDB_CLEAR_IF_FIRST|TDB_NOSYNC,
+ O_CREAT|O_RDWR, 0600);
+ if (ctdb_db->rottdb == NULL) {
+ DEBUG(DEBUG_CRIT,("Failed to open/create the tracking database '%s'\n", ropath));
+ talloc_free(ropath);
+ return -1;
+ }
+
+ DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
+
+ ctdb_db->readonly = true;
+
+ DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
+
+ talloc_free(ropath);
+ return 0;
+}
+
/*
attach to a database, handling both persistent and non-persistent databases
return 0 on success, -1 on failure
*/
static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
bool persistent, const char *unhealthy_reason,
- bool jenkinshash)
+ bool jenkinshash, bool mutexes)
{
struct ctdb_db_context *ctdb_db, *tmp_db;
int ret;
ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
CTDB_NO_MEMORY(ctdb, ctdb_db);
- ctdb_db->priority = 1;
ctdb_db->ctdb = ctdb;
ctdb_db->db_name = talloc_strdup(ctdb_db, db_name);
CTDB_NO_MEMORY(ctdb, ctdb_db->db_name);
if (ctdb->max_persistent_check_errors > 0) {
remaining_tries = 1;
}
- if (ctdb->done_startup) {
+ if (ctdb->runstate == CTDB_RUNSTATE_RUNNING) {
remaining_tries = 0;
}
if (jenkinshash) {
tdb_flags |= TDB_INCOMPATIBLE_HASH;
}
+#ifdef TDB_MUTEX_LOCKING
+ if (ctdb->tunable.mutex_enabled && mutexes &&
+ tdb_runtime_check_for_robust_mutexes()) {
+ tdb_flags |= (TDB_MUTEX_LOCKING | TDB_CLEAR_IF_FIRST);
+ }
+#endif
again:
- ctdb_db->ltdb = tdb_wrap_open(ctdb, ctdb_db->db_path,
+ ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
ctdb->tunable.database_hash_size,
tdb_flags,
O_CREAT|O_RDWR, mode);
}
}
+ /* set up a rb tree we can use to track which records we have a
+ fetch-lock in-flight for so we can defer any additional calls
+ for the same record.
+ */
+ ctdb_db->deferred_fetch = trbt_create(ctdb_db, 0);
+ if (ctdb_db->deferred_fetch == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to create deferred fetch rb tree for ctdb database\n"));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ ctdb_db->defer_dmaster = trbt_create(ctdb_db, 0);
+ if (ctdb_db->defer_dmaster == NULL) {
+ DEBUG(DEBUG_ERR, ("Failed to create defer dmaster rb tree for %s\n",
+ ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
DLIST_ADD(ctdb->db_list, ctdb_db);
/* setting this can help some high churn databases */
return -1;
}
+ /*
+ all databases support the "fetch_with_header" function. we need this
+ for efficient readonly record fetches
+ */
+ ret = ctdb_daemon_set_call(ctdb, ctdb_db->db_id, ctdb_fetch_with_header_func, CTDB_FETCH_WITH_HEADER_FUNC);
+ if (ret != 0) {
+ DEBUG(DEBUG_CRIT,("Failed to setup fetch function for '%s'\n", ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
ret = ctdb_vacuum_init(ctdb_db);
if (ret != 0) {
DEBUG(DEBUG_CRIT,("Failed to setup vacuuming for "
return -1;
}
+ ret = ctdb_migration_init(ctdb_db);
+ if (ret != 0) {
+ DEBUG(DEBUG_ERR,
+ ("Failed to setup migration tracking for db '%s'\n",
+ ctdb_db->db_name));
+ talloc_free(ctdb_db);
+ return -1;
+ }
+
+ ctdb_db->generation = ctdb->vnn_map->generation;
+
+ DEBUG(DEBUG_NOTICE,("Attached to database '%s' with flags 0x%x\n",
+ ctdb_db->db_path, tdb_flags));
- DEBUG(DEBUG_INFO,("Attached to database '%s'\n", ctdb_db->db_path));
-
/* success */
return 0;
}
struct ctdb_deferred_attach_context {
struct ctdb_deferred_attach_context *next, *prev;
struct ctdb_context *ctdb;
- struct ctdb_req_control *c;
+ struct ctdb_req_control_old *c;
};
return 0;
}
-static void ctdb_deferred_attach_timeout(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_timeout(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval t, void *private_data)
{
struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
struct ctdb_context *ctdb = da_ctx->ctdb;
talloc_free(da_ctx);
}
-static void ctdb_deferred_attach_callback(struct event_context *ev, struct timed_event *te, struct timeval t, void *private_data)
+static void ctdb_deferred_attach_callback(struct tevent_context *ev,
+ struct tevent_timer *te,
+ struct timeval t, void *private_data)
{
struct ctdb_deferred_attach_context *da_ctx = talloc_get_type(private_data, struct ctdb_deferred_attach_context);
struct ctdb_context *ctdb = da_ctx->ctdb;
*/
while ((da_ctx = ctdb->deferred_attach) != NULL) {
DLIST_REMOVE(ctdb->deferred_attach, da_ctx);
- event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(1,0), ctdb_deferred_attach_callback, da_ctx);
+ tevent_add_timer(ctdb->ev, da_ctx,
+ timeval_current_ofs(1,0),
+ ctdb_deferred_attach_callback, da_ctx);
}
return 0;
int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
TDB_DATA *outdata, uint64_t tdb_flags,
bool persistent, uint32_t client_id,
- struct ctdb_req_control *c,
+ struct ctdb_req_control_old *c,
bool *async_reply)
{
const char *db_name = (const char *)indata.dptr;
struct ctdb_db_context *db;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
struct ctdb_client *client = NULL;
+ bool with_jenkinshash, with_mutexes;
- /* dont allow any local clients to attach while we are in recovery mode
+ if (ctdb->tunable.allow_client_db_attach == 0) {
+ DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
+ "AllowClientDBAccess == 0\n", db_name));
+ return -1;
+ }
+
+ /* don't allow any local clients to attach while we are in recovery mode
* except for the recovery daemon.
* allow all attach from the network since these are always from remote
* recovery daemons.
*/
if (client_id != 0) {
- client = ctdb_reqid_find(ctdb, client_id, struct ctdb_client);
+ client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
}
if (client != NULL) {
/* If the node is inactive it is not part of the cluster
databases
*/
if (node->flags & NODE_FLAGS_INACTIVE) {
- DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (disconnected or banned)\n", db_name));
+ DEBUG(DEBUG_ERR,("DB Attach to database %s refused since node is inactive (flags=0x%x)\n", db_name, node->flags));
return -1;
}
- if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE
- && client->pid != ctdb->recoverd_pid
- && !ctdb->done_startup) {
+ if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE &&
+ client->pid != ctdb->recoverd_pid &&
+ ctdb->runstate < CTDB_RUNSTATE_RUNNING) {
struct ctdb_deferred_attach_context *da_ctx = talloc(client, struct ctdb_deferred_attach_context);
if (da_ctx == NULL) {
talloc_set_destructor(da_ctx, ctdb_deferred_attach_destructor);
DLIST_ADD(ctdb->deferred_attach, da_ctx);
- event_add_timed(ctdb->ev, da_ctx, timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0), ctdb_deferred_attach_timeout, da_ctx);
+ tevent_add_timer(ctdb->ev, da_ctx,
+ timeval_current_ofs(ctdb->tunable.deferred_attach_timeout, 0),
+ ctdb_deferred_attach_timeout, da_ctx);
DEBUG(DEBUG_ERR,("DB Attach to database %s deferred for client with pid:%d since node is in recovery mode.\n", db_name, client->pid));
*async_reply = true;
only allow a subset of those on the database in ctdb. Note
that tdb_flags is passed in via the (otherwise unused)
srvid to the attach control */
+#ifdef TDB_MUTEX_LOCKING
+ tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH|TDB_MUTEX_LOCKING|TDB_CLEAR_IF_FIRST);
+#else
tdb_flags &= (TDB_NOSYNC|TDB_INCOMPATIBLE_HASH);
+#endif
/* see if we already have this name */
db = ctdb_db_handle(ctdb, db_name);
if (db) {
+ if (db->persistent != persistent) {
+ DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
+ "database %s\n", persistent ? "" : "non-",
+ db-> persistent ? "" : "non-", db_name));
+ return -1;
+ }
outdata->dptr = (uint8_t *)&db->db_id;
outdata->dsize = sizeof(db->db_id);
tdb_add_flags(db->ltdb->tdb, tdb_flags);
return 0;
}
- if (ctdb_local_attach(ctdb, db_name, persistent, NULL, (tdb_flags&TDB_INCOMPATIBLE_HASH)?true:false) != 0) {
+ with_jenkinshash = (tdb_flags & TDB_INCOMPATIBLE_HASH) ? true : false;
+#ifdef TDB_MUTEX_LOCKING
+ with_mutexes = (tdb_flags & TDB_MUTEX_LOCKING) ? true : false;
+#else
+ with_mutexes = false;
+#endif
+
+ if (ctdb_local_attach(ctdb, db_name, persistent, NULL,
+ with_jenkinshash, with_mutexes) != 0) {
return -1;
}
outdata->dsize = sizeof(db->db_id);
/* Try to ensure it's locked in mem */
- ctdb_lockdown_memory(ctdb);
+ lockdown_memory(ctdb->valgrinding);
/* tell all the other nodes about this database */
ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, tdb_flags,
return 0;
}
+/*
+ * a client has asked to detach from a database
+ */
+int32_t ctdb_control_db_detach(struct ctdb_context *ctdb, TDB_DATA indata,
+ uint32_t client_id)
+{
+ uint32_t db_id;
+ struct ctdb_db_context *ctdb_db;
+ struct ctdb_client *client = NULL;
+
+ db_id = *(uint32_t *)indata.dptr;
+ ctdb_db = find_ctdb_db(ctdb, db_id);
+ if (ctdb_db == NULL) {
+ DEBUG(DEBUG_ERR, ("Invalid dbid 0x%08x in DB detach\n",
+ db_id));
+ return -1;
+ }
+
+ if (ctdb->tunable.allow_client_db_attach == 1) {
+ DEBUG(DEBUG_ERR, ("DB detach from database %s denied. "
+ "Clients are allowed access to databases "
+ "(AllowClientDBAccess == 1)\n",
+ ctdb_db->db_name));
+ return -1;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
+ "denied\n", ctdb_db->db_name));
+ return -1;
+ }
+
+ /* Cannot detach from database when in recovery */
+ if (ctdb->recovery_mode == CTDB_RECOVERY_ACTIVE) {
+ DEBUG(DEBUG_ERR, ("DB detach denied while in recovery\n"));
+ return -1;
+ }
+
+ /* If a control comes from a client, then broadcast it to all nodes.
+ * Do the actual detach only if the control comes from other daemons.
+ */
+ if (client_id != 0) {
+ client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
+ if (client != NULL) {
+ /* forward the control to all the nodes */
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ CTDB_CONTROL_DB_DETACH, 0,
+ CTDB_CTRL_FLAG_NOREPLY,
+ indata, NULL, NULL);
+ return 0;
+ }
+ DEBUG(DEBUG_ERR, ("Client has gone away. Failing DB detach "
+ "for database '%s'\n", ctdb_db->db_name));
+ return -1;
+ }
+
+ /* Detach database from recoverd */
+ if (ctdb_daemon_send_message(ctdb, ctdb->pnn,
+ CTDB_SRVID_DETACH_DATABASE,
+ indata) != 0) {
+ DEBUG(DEBUG_ERR, ("Unable to detach DB from recoverd\n"));
+ return -1;
+ }
+
+ /* Disable vacuuming and drop all vacuuming data */
+ talloc_free(ctdb_db->vacuum_handle);
+ talloc_free(ctdb_db->delete_queue);
+
+ /* Terminate any deferred fetch */
+ talloc_free(ctdb_db->deferred_fetch);
+
+ /* Terminate any traverses */
+ while (ctdb_db->traverse) {
+ talloc_free(ctdb_db->traverse);
+ }
+
+ /* Terminate any revokes */
+ while (ctdb_db->revokechild_active) {
+ talloc_free(ctdb_db->revokechild_active);
+ }
+
+ /* Free readonly tracking database */
+ if (ctdb_db->readonly) {
+ talloc_free(ctdb_db->rottdb);
+ }
+
+ DLIST_REMOVE(ctdb->db_list, ctdb_db);
+
+ DEBUG(DEBUG_NOTICE, ("Detached from database '%s'\n",
+ ctdb_db->db_name));
+ talloc_free(ctdb_db);
+
+ return 0;
+}
/*
attach to all existing persistent databases
int invalid_name = 0;
s = talloc_strdup(ctdb, de->d_name);
- CTDB_NO_MEMORY(ctdb, s);
+ if (s == NULL) {
+ closedir(d);
+ CTDB_NO_MEMORY(ctdb, s);
+ }
/* only accept names ending in .tdb */
p = strstr(s, ".tdb.");
}
p[4] = 0;
- if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, 0) != 0) {
+ if (ctdb_local_attach(ctdb, s, true, unhealthy_reason, false, false) != 0) {
DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
closedir(d);
talloc_free(s);
char *unhealthy_reason = NULL;
bool first_try = true;
- if (ctdb->db_directory == NULL) {
- ctdb->db_directory = VARDIR "/ctdb";
- }
- if (ctdb->db_directory_persistent == NULL) {
- ctdb->db_directory_persistent = VARDIR "/ctdb/persistent";
- }
- if (ctdb->db_directory_state == NULL) {
- ctdb->db_directory_state = VARDIR "/ctdb/state";
- }
-
- /* make sure the db directory exists */
- ret = mkdir(ctdb->db_directory, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb directory '%s'\n",
- ctdb->db_directory));
- return -1;
- }
-
- /* make sure the persistent db directory exists */
- ret = mkdir(ctdb->db_directory_persistent, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb persistent directory '%s'\n",
- ctdb->db_directory_persistent));
- return -1;
- }
-
- /* make sure the internal state db directory exists */
- ret = mkdir(ctdb->db_directory_state, 0700);
- if (ret == -1 && errno != EEXIST) {
- DEBUG(DEBUG_CRIT,(__location__ " Unable to create ctdb state directory '%s'\n",
- ctdb->db_directory_state));
- return -1;
- }
-
persistent_health_path = talloc_asprintf(ctdb, "%s/%s.%u",
ctdb->db_directory_state,
PERSISTENT_HEALTH_TDB,
/*
timer to check for seqnum changes in a ltdb and propogate them
*/
-static void ctdb_ltdb_seqnum_check(struct event_context *ev, struct timed_event *te,
+static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
+ struct tevent_timer *te,
struct timeval t, void *p)
{
struct ctdb_db_context *ctdb_db = talloc_get_type(p, struct ctdb_db_context);
/* setup a new timer */
ctdb_db->seqnum_update =
- event_add_timed(ctdb->ev, ctdb_db,
- timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
- ctdb_ltdb_seqnum_check, ctdb_db);
+ tevent_add_timer(ctdb->ev, ctdb_db,
+ timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+ (ctdb->tunable.seqnum_interval%1000)*1000),
+ ctdb_ltdb_seqnum_check, ctdb_db);
}
/*
}
if (ctdb_db->seqnum_update == NULL) {
- ctdb_db->seqnum_update =
- event_add_timed(ctdb->ev, ctdb_db,
- timeval_current_ofs(ctdb->tunable.seqnum_interval/1000, (ctdb->tunable.seqnum_interval%1000)*1000),
- ctdb_ltdb_seqnum_check, ctdb_db);
+ ctdb_db->seqnum_update = tevent_add_timer(
+ ctdb->ev, ctdb_db,
+ timeval_current_ofs(ctdb->tunable.seqnum_interval/1000,
+ (ctdb->tunable.seqnum_interval%1000)*1000),
+ ctdb_ltdb_seqnum_check, ctdb_db);
}
tdb_enable_seqnum(ctdb_db->ltdb->tdb);
return 0;
}
-int32_t ctdb_control_set_db_priority(struct ctdb_context *ctdb, TDB_DATA indata)
+int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
+{
+ if (ctdb_db->sticky) {
+ return 0;
+ }
+
+ if (ctdb_db->persistent) {
+ DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+ return -1;
+ }
+
+ ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
+
+ ctdb_db->sticky = true;
+
+ DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));
+
+ return 0;
+}
+
+void ctdb_db_statistics_reset(struct ctdb_db_context *ctdb_db)
+{
+ struct ctdb_db_statistics_old *s = &ctdb_db->statistics;
+ int i;
+
+ for (i=0; i<MAX_HOT_KEYS; i++) {
+ if (s->hot_keys[i].key.dsize > 0) {
+ talloc_free(s->hot_keys[i].key.dptr);
+ }
+ }
+
+ ZERO_STRUCT(ctdb_db->statistics);
+}
+
+int32_t ctdb_control_get_db_statistics(struct ctdb_context *ctdb,
+ uint32_t db_id,
+ TDB_DATA *outdata)
{
- struct ctdb_db_priority *db_prio = (struct ctdb_db_priority *)indata.dptr;
struct ctdb_db_context *ctdb_db;
+ struct ctdb_db_statistics_old *stats;
+ int i;
+ int len;
+ char *ptr;
- ctdb_db = find_ctdb_db(ctdb, db_prio->db_id);
+ ctdb_db = find_ctdb_db(ctdb, db_id);
if (!ctdb_db) {
- DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in ctdb_set_db_priority\n", db_prio->db_id));
- return 0;
+ DEBUG(DEBUG_ERR,("Unknown db_id 0x%x in get_db_statistics\n", db_id));
+ return -1;
}
- if ((db_prio->priority<1) || (db_prio->priority>NUM_DB_PRIORITIES)) {
- DEBUG(DEBUG_ERR,("Trying to set invalid priority : %u\n", db_prio->priority));
- return 0;
+ len = offsetof(struct ctdb_db_statistics_old, hot_keys_wire);
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ len += ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ stats = talloc_size(outdata, len);
+ if (stats == NULL) {
+ DEBUG(DEBUG_ERR,("Failed to allocate db statistics structure\n"));
+ return -1;
}
- ctdb_db->priority = db_prio->priority;
- DEBUG(DEBUG_INFO,("Setting DB priority to %u for db 0x%08x\n", db_prio->priority, db_prio->db_id));
+ memcpy(stats, &ctdb_db->statistics,
+ offsetof(struct ctdb_db_statistics_old, hot_keys_wire));
+
+ stats->num_hot_keys = MAX_HOT_KEYS;
+
+ ptr = &stats->hot_keys_wire[0];
+ for (i = 0; i < MAX_HOT_KEYS; i++) {
+ memcpy(ptr, ctdb_db->statistics.hot_keys[i].key.dptr,
+ ctdb_db->statistics.hot_keys[i].key.dsize);
+ ptr += ctdb_db->statistics.hot_keys[i].key.dsize;
+ }
+
+ outdata->dptr = (uint8_t *)stats;
+ outdata->dsize = len;
return 0;
}
-