#include "common/common.h"
#include "common/logging.h"
+#include "server/ctdb_config.h"
+
#define PERSISTENT_HEALTH_TDB "persistent_health.tdb"
/**
TDB_DATA rec[2];
uint32_t hsize = sizeof(struct ctdb_ltdb_header);
int ret;
- bool seqnum_suppressed = false;
bool keep = false;
bool schedule_for_deletion = false;
bool remove_from_delete_queue = false;
}
if (keep) {
- if (!ctdb_db->persistent &&
+ if (ctdb_db_volatile(ctdb_db) &&
(ctdb_db->ctdb->pnn == header->dmaster) &&
!(header->flags & CTDB_REC_RO_FLAGS))
{
rec[1].dsize = data.dsize;
rec[1].dptr = data.dptr;
- /* Databases with seqnum updates enabled only get their seqnum
- changes when/if we modify the data */
- if (ctdb_db->seqnum_update != NULL) {
- TDB_DATA old;
- old = tdb_fetch(ctdb_db->ltdb->tdb, key);
-
- if ((old.dsize == hsize + data.dsize) &&
- memcmp(old.dptr + hsize, data.dptr, data.dsize) == 0) {
- tdb_remove_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
- seqnum_suppressed = true;
- }
- if (old.dptr != NULL) {
- free(old.dptr);
- }
- }
-
DEBUG(DEBUG_DEBUG, (__location__ " db[%s]: %s record: hash[0x%08x]\n",
ctdb_db->db_name,
keep?"storing":"deleting",
schedule_for_deletion = false;
remove_from_delete_queue = false;
}
- if (seqnum_suppressed) {
- tdb_add_flags(ctdb_db->ltdb->tdb, TDB_SEQNUM);
- }
if (schedule_for_deletion) {
int ret2;
ret = ctdb_ltdb_lock_requeue(ctdb_db, key, hdr, recv_pkt,
recv_context, ignore_generation);
- if (ret == 0) {
- ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
- if (ret != 0) {
- int uret;
- uret = ctdb_ltdb_unlock(ctdb_db, key);
- if (uret != 0) {
- DEBUG(DEBUG_ERR,(__location__ " ctdb_ltdb_unlock() failed with error %d\n", uret));
- }
+ if (ret != 0) {
+ return ret;
+ }
+
+ ret = ctdb_ltdb_fetch(ctdb_db, key, header, hdr, data);
+ if (ret != 0) {
+ int uret;
+ uret = ctdb_ltdb_unlock(ctdb_db, key);
+ if (uret != 0) {
+ DBG_ERR("ctdb_ltdb_unlock() failed with error %d\n",
+ uret);
}
}
return ret;
/*
- paraoid check to see if the db is empty
+ paranoid check to see if the db is empty
*/
static void ctdb_check_db_empty(struct ctdb_db_context *ctdb_db)
{
int fail = 0;
for (ctdb_db = ctdb->db_list; ctdb_db; ctdb_db = ctdb_db->next) {
- if (!ctdb_db->persistent) {
+ if (!ctdb_db_persistent(ctdb_db)) {
continue;
}
{
char *ropath;
- if (ctdb_db->readonly) {
+ if (ctdb_db_readonly(ctdb_db)) {
return 0;
}
- if (ctdb_db->persistent) {
- DEBUG(DEBUG_ERR,("Persistent databases do not support readonly property\n"));
+ if (! ctdb_db_volatile(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("Non-volatile databases do not support readonly flag\n"));
return -1;
}
DEBUG(DEBUG_NOTICE,("OPENED tracking database : '%s'\n", ropath));
- ctdb_db->readonly = true;
+ ctdb_db_set_readonly(ctdb_db);
DEBUG(DEBUG_NOTICE, ("Readonly property set on DB %s\n", ctdb_db->db_name));
return 0 on success, -1 on failure
*/
static int ctdb_local_attach(struct ctdb_context *ctdb, const char *db_name,
- bool persistent, const char *unhealthy_reason)
+ uint8_t db_flags, const char *unhealthy_reason)
{
struct ctdb_db_context *ctdb_db, *tmp_db;
int ret;
int tdb_flags;
int mode = 0600;
int remaining_tries = 0;
- uint8_t db_flags = 0;
ctdb_db = talloc_zero(ctdb, struct ctdb_db_context);
CTDB_NO_MEMORY(ctdb, ctdb_db);
key.dsize = strlen(db_name)+1;
key.dptr = discard_const(db_name);
ctdb_db->db_id = ctdb_hash(&key);
- ctdb_db->persistent = persistent;
+ ctdb_db->db_flags = db_flags;
- if (!ctdb_db->persistent) {
+ if (ctdb_db_volatile(ctdb_db)) {
ctdb_db->delete_queue = trbt_create(ctdb_db, 0);
if (ctdb_db->delete_queue == NULL) {
CTDB_NO_MEMORY(ctdb, ctdb_db->delete_queue);
}
}
- if (persistent) {
+ if (ctdb_db_persistent(ctdb_db)) {
if (unhealthy_reason) {
ret = ctdb_update_persistent_health(ctdb, ctdb_db,
unhealthy_reason, 0);
}
/* open the database */
- ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
- persistent?ctdb->db_directory_persistent:ctdb->db_directory,
+ ctdb_db->db_path = talloc_asprintf(ctdb_db, "%s/%s.%u",
+ ctdb_db_persistent(ctdb_db) ?
+ ctdb->db_directory_persistent :
+ ctdb->db_directory,
db_name, ctdb->pnn);
- if (persistent) {
- db_flags = CTDB_DB_FLAGS_PERSISTENT;
- }
-
- tdb_flags = ctdb_db_tdb_flags(db_flags, ctdb->valgrinding,
- ctdb->tunable.mutex_enabled);
+ tdb_flags = ctdb_db_tdb_flags(db_flags,
+ ctdb->valgrinding,
+ ctdb_config.tdb_mutexes);
again:
ctdb_db->ltdb = tdb_wrap_open(ctdb_db, ctdb_db->db_path,
struct stat st;
int saved_errno = errno;
- if (!persistent) {
+ if (! ctdb_db_persistent(ctdb_db)) {
DEBUG(DEBUG_CRIT,("Failed to open tdb '%s': %d - %s\n",
ctdb_db->db_path,
saved_errno,
goto again;
}
- if (!persistent) {
+ if (!ctdb_db_persistent(ctdb_db)) {
ctdb_check_db_empty(ctdb_db);
} else {
ret = tdb_check(ctdb_db->ltdb->tdb, NULL, NULL);
/*
a client has asked to attach a new database
*/
-int32_t ctdb_control_db_attach(struct ctdb_context *ctdb, TDB_DATA indata,
+int32_t ctdb_control_db_attach(struct ctdb_context *ctdb,
+ TDB_DATA indata,
TDB_DATA *outdata,
- bool persistent, uint32_t client_id,
+ uint8_t db_flags,
+ uint32_t srcnode,
+ uint32_t client_id,
struct ctdb_req_control_old *c,
bool *async_reply)
{
struct ctdb_db_context *db;
struct ctdb_node *node = ctdb->nodes[ctdb->pnn];
struct ctdb_client *client = NULL;
+ uint32_t opcode;
if (ctdb->tunable.allow_client_db_attach == 0) {
DEBUG(DEBUG_ERR, ("DB Attach to database %s denied by tunable "
* allow all attach from the network since these are always from remote
* recovery daemons.
*/
- if (client_id != 0) {
+ if (srcnode == ctdb->pnn && client_id != 0) {
client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
}
if (client != NULL) {
/* see if we already have this name */
db = ctdb_db_handle(ctdb, db_name);
if (db) {
- if (db->persistent != persistent) {
- DEBUG(DEBUG_ERR, ("ERROR: DB Attach %spersistent to %spersistent "
- "database %s\n", persistent ? "" : "non-",
- db-> persistent ? "" : "non-", db_name));
+ if ((db->db_flags & db_flags) != db_flags) {
+ DEBUG(DEBUG_ERR,
+ ("Error: Failed to re-attach with 0x%x flags,"
+ " database has 0x%x flags\n", db_flags,
+ db->db_flags));
return -1;
}
outdata->dptr = (uint8_t *)&db->db_id;
return 0;
}
- if (ctdb_local_attach(ctdb, db_name, persistent, NULL) != 0) {
+ if (ctdb_local_attach(ctdb, db_name, db_flags, NULL) != 0) {
return -1;
}
/* Try to ensure it's locked in mem */
lockdown_memory(ctdb->valgrinding);
+ if (ctdb_db_persistent(db)) {
+ opcode = CTDB_CONTROL_DB_ATTACH_PERSISTENT;
+ } else if (ctdb_db_replicated(db)) {
+ opcode = CTDB_CONTROL_DB_ATTACH_REPLICATED;
+ } else {
+ opcode = CTDB_CONTROL_DB_ATTACH;
+ }
+
/* tell all the other nodes about this database */
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
- persistent?CTDB_CONTROL_DB_ATTACH_PERSISTENT:
- CTDB_CONTROL_DB_ATTACH,
+ ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_CONNECTED, 0, opcode,
0, CTDB_CTRL_FLAG_NOREPLY,
indata, NULL, NULL);
return -1;
}
- if (ctdb_db->persistent) {
- DEBUG(DEBUG_ERR, ("DB detach from persistent database %s "
- "denied\n", ctdb_db->db_name));
+ if (! ctdb_db_volatile(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("Detaching non-volatile database %s denied\n",
+ ctdb_db->db_name));
return -1;
}
client = reqid_find(ctdb->idr, client_id, struct ctdb_client);
if (client != NULL) {
/* forward the control to all the nodes */
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_ALL, 0,
+ ctdb_daemon_send_control(ctdb,
+ CTDB_BROADCAST_CONNECTED, 0,
CTDB_CONTROL_DB_DETACH, 0,
CTDB_CTRL_FLAG_NOREPLY,
indata, NULL, NULL);
}
/* Free readonly tracking database */
- if (ctdb_db->readonly) {
+ if (ctdb_db_readonly(ctdb_db)) {
talloc_free(ctdb_db->rottdb);
}
}
p[4] = 0;
- if (ctdb_local_attach(ctdb, s, true, unhealthy_reason) != 0) {
+ if (ctdb_local_attach(ctdb, s, CTDB_DB_FLAGS_PERSISTENT, unhealthy_reason) != 0) {
DEBUG(DEBUG_ERR,("Failed to attach to persistent database '%s'\n", de->d_name));
closedir(d);
talloc_free(s);
}
/*
- timer to check for seqnum changes in a ltdb and propogate them
+ timer to check for seqnum changes in a ltdb and propagate them
*/
static void ctdb_ltdb_seqnum_check(struct tevent_context *ev,
struct tevent_timer *te,
struct ctdb_context *ctdb = ctdb_db->ctdb;
uint32_t new_seqnum = tdb_get_seqnum(ctdb_db->ltdb->tdb);
if (new_seqnum != ctdb_db->seqnum) {
- /* something has changed - propogate it */
+ /* something has changed - propagate it */
TDB_DATA data;
data.dptr = (uint8_t *)&ctdb_db->db_id;
data.dsize = sizeof(uint32_t);
- ctdb_daemon_send_control(ctdb, CTDB_BROADCAST_VNNMAP, 0,
- CTDB_CONTROL_UPDATE_SEQNUM, 0, CTDB_CTRL_FLAG_NOREPLY,
- data, NULL, NULL);
+ ctdb_daemon_send_control(ctdb,
+ CTDB_BROADCAST_ACTIVE,
+ 0,
+ CTDB_CONTROL_UPDATE_SEQNUM,
+ 0,
+ CTDB_CTRL_FLAG_NOREPLY,
+ data,
+ NULL,
+ NULL);
}
ctdb_db->seqnum = new_seqnum;
int ctdb_set_db_sticky(struct ctdb_context *ctdb, struct ctdb_db_context *ctdb_db)
{
- if (ctdb_db->sticky) {
+ if (ctdb_db_sticky(ctdb_db)) {
return 0;
}
- if (ctdb_db->persistent) {
- DEBUG(DEBUG_ERR,("Trying to set persistent database with sticky property\n"));
+ if (! ctdb_db_volatile(ctdb_db)) {
+ DEBUG(DEBUG_ERR,
+ ("Non-volatile databases do not support sticky flag\n"));
return -1;
}
ctdb_db->sticky_records = trbt_create(ctdb_db, 0);
- ctdb_db->sticky = true;
+ ctdb_db_set_sticky(ctdb_db);
DEBUG(DEBUG_NOTICE,("set db sticky %s\n", ctdb_db->db_name));